1
0
mirror of https://github.com/rancher/rke.git synced 2025-09-03 16:04:26 +00:00

Use Cluster structure

Use separate cluster package

Save cluster state and certs to kubernetes

Handle Remove and sync cluster state/crts

Reuse kubernetes client and combine image and version

Separate building functions and small fixes
This commit is contained in:
galal-hussein
2017-11-02 12:07:10 +02:00
parent c4677f8ee6
commit 9974d53e57
23 changed files with 735 additions and 323 deletions

View File

@@ -14,25 +14,19 @@ hosts:
services:
etcd:
version: latest
image: quay.io/coreos/etcd
image: quay.io/coreos/etcd:latest
kube-api:
version: v1.7.5_coreos.0
image: quay.io/coreos/hyperkube
image: quay.io/coreos/hyperkube:v1.7.5_coreos.0
service_cluster_ip_range: 10.233.0.0/18
kube-controller:
version: v1.7.5_coreos.0
image: quay.io/coreos/hyperkube
image: quay.io/coreos/hyperkube:v1.7.5_coreos.0
cluster_cidr: 10.233.64.0/18
service_cluster_ip_range: 10.233.0.0/18
scheduler:
version: v1.7.5_coreos.0
image: quay.io/coreos/hyperkube
image: quay.io/coreos/hyperkube:v1.7.5_coreos.0
kubelet:
version: v1.7.5_coreos.0
image: quay.io/coreos/hyperkube
image: quay.io/coreos/hyperkube:v1.7.5_coreos.0
cluster_domain: cluster.local
infra_container_image: gcr.io/google_containers/pause-amd64:3.0
kubeproxy:
version: v1.7.5_coreos.0
image: quay.io/coreos/hyperkube
image: quay.io/coreos/hyperkube:v1.7.5_coreos.0

112
cluster/certificates.go Normal file
View File

@@ -0,0 +1,112 @@
package cluster
import (
"crypto/rsa"
"fmt"
"time"
"github.com/Sirupsen/logrus"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/pki"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/cert"
)
func SetUpAuthentication(kubeCluster, currentCluster *Cluster, authType string) error {
if authType == X509AuthenticationProvider {
var err error
if currentCluster != nil {
kubeCluster.Certificates, err = getClusterCerts(kubeCluster.KClient)
if err != nil {
return fmt.Errorf("Failed to Get Kubernetes certificates: %v", err)
}
} else {
kubeCluster.Certificates, err = pki.StartCertificatesGeneration(
kubeCluster.ControlPlaneHosts,
kubeCluster.WorkerHosts,
kubeCluster.ClusterDomain,
kubeCluster.KubernetesServiceIP)
if err != nil {
return fmt.Errorf("Failed to generate Kubernetes certificates: %v", err)
}
}
}
return nil
}
func getClusterCerts(kClient *kubernetes.Clientset) (map[string]pki.CertificatePKI, error) {
logrus.Infof("[certificates] Getting Cluster certificates from Kubernetes")
certificatesNames := []string{
pki.CACertName,
pki.KubeAPICertName,
pki.KubeNodeName,
pki.KubeProxyName,
pki.KubeControllerName,
pki.KubeSchedulerName,
pki.KubeAdminCommonName,
}
certMap := make(map[string]pki.CertificatePKI)
for _, certName := range certificatesNames {
secret, err := k8s.GetSecret(kClient, certName)
if err != nil {
return nil, err
}
secretCert, _ := cert.ParseCertsPEM(secret.Data["Certificate"])
secretKey, _ := cert.ParsePrivateKeyPEM(secret.Data["Key"])
secretConfig := string(secret.Data["Config"])
certMap[certName] = pki.CertificatePKI{
Certificate: secretCert[0],
Key: secretKey.(*rsa.PrivateKey),
Config: secretConfig,
}
}
logrus.Infof("[certificates] Successfully fetched Cluster certificates from Kubernetes")
return certMap, nil
}
func saveClusterCerts(kClient *kubernetes.Clientset, crts map[string]pki.CertificatePKI) error {
logrus.Infof("[certificates] Save kubernetes certificates as secrets")
for crtName, crt := range crts {
err := saveCertToKubernetes(kClient, crtName, crt)
if err != nil {
return fmt.Errorf("Failed to save certificate [%s] to kubernetes: %v", crtName, err)
}
}
logrus.Infof("[certificates] Successfuly saved certificates as kubernetes secret [%s]", pki.CertificatesSecretName)
return nil
}
func saveCertToKubernetes(kClient *kubernetes.Clientset, crtName string, crt pki.CertificatePKI) error {
logrus.Debugf("[certificates] Saving certificate [%s] to kubernetes", crtName)
timeout := make(chan bool, 1)
go func() {
for {
err := k8s.UpdateSecret(kClient, "Certificate", cert.EncodeCertPEM(crt.Certificate), crtName)
if err != nil {
time.Sleep(time.Second * 5)
continue
}
err = k8s.UpdateSecret(kClient, "Key", cert.EncodePrivateKeyPEM(crt.Key), crtName)
if err != nil {
time.Sleep(time.Second * 5)
continue
}
if len(crt.Config) > 0 {
err = k8s.UpdateSecret(kClient, "Config", []byte(crt.Config), crtName)
if err != nil {
time.Sleep(time.Second * 5)
continue
}
}
timeout <- true
break
}
}()
select {
case <-timeout:
return nil
case <-time.After(time.Second * KubernetesClientTimeOut):
return fmt.Errorf("[certificates] Timeout waiting for kubernetes to be ready")
}
return nil
}

98
cluster/cluster.go Normal file
View File

@@ -0,0 +1,98 @@
package cluster
import (
"fmt"
"net"
"github.com/Sirupsen/logrus"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
yaml "gopkg.in/yaml.v2"
"k8s.io/client-go/kubernetes"
)
type Cluster struct {
Services services.Services `yaml:"services"`
Hosts []hosts.Host `yaml:"hosts"`
EtcdHosts []hosts.Host
WorkerHosts []hosts.Host
ControlPlaneHosts []hosts.Host
KClient *kubernetes.Clientset
KubernetesServiceIP net.IP
Certificates map[string]pki.CertificatePKI
ClusterDomain string
}
const (
X509AuthenticationProvider = "x509"
StateConfigMapName = "cluster-state"
UpdateStateTimeout = 30
GetStateTimeout = 30
KubernetesClientTimeOut = 30
)
func (c *Cluster) DeployClusterPlanes() error {
// Deploy Kubernetes Planes
err := services.RunEtcdPlane(c.EtcdHosts, c.Services.Etcd)
if err != nil {
return fmt.Errorf("[etcd] Failed to bring up Etcd Plane: %v", err)
}
err = services.RunControlPlane(c.ControlPlaneHosts, c.EtcdHosts, c.Services)
if err != nil {
return fmt.Errorf("[controlPlane] Failed to bring up Control Plane: %v", err)
}
err = services.RunWorkerPlane(c.ControlPlaneHosts, c.WorkerHosts, c.Services)
if err != nil {
return fmt.Errorf("[workerPlane] Failed to bring up Worker Plane: %v", err)
}
return nil
}
func ParseConfig(clusterFile string) (*Cluster, error) {
logrus.Debugf("Parsing cluster file [%v]", clusterFile)
var err error
c, err := parseClusterFile(clusterFile)
if err != nil {
return nil, fmt.Errorf("Failed to parse the cluster file: %v", err)
}
err = c.InvertIndexHosts()
if err != nil {
return nil, fmt.Errorf("Failed to classify hosts from config file: %v", err)
}
c.KubernetesServiceIP, err = services.GetKubernetesServiceIp(c.Services.KubeAPI.ServiceClusterIPRange)
if err != nil {
return nil, fmt.Errorf("Failed to get Kubernetes Service IP: %v", err)
}
c.ClusterDomain = c.Services.Kubelet.ClusterDomain
return c, nil
}
func parseClusterFile(clusterFile string) (*Cluster, error) {
// parse hosts
var kubeCluster Cluster
err := yaml.Unmarshal([]byte(clusterFile), &kubeCluster)
if err != nil {
return nil, err
}
for i, host := range kubeCluster.Hosts {
if len(host.Hostname) == 0 {
return nil, fmt.Errorf("Hostname for host (%d) is not provided", i+1)
} else if len(host.User) == 0 {
return nil, fmt.Errorf("User for host (%d) is not provided", i+1)
} else if len(host.Role) == 0 {
return nil, fmt.Errorf("Role for host (%d) is not provided", i+1)
} else if host.AdvertiseAddress == "" {
// if control_plane_ip is not set,
// default to the main IP
kubeCluster.Hosts[i].AdvertiseAddress = host.IP
}
for _, role := range host.Role {
if role != services.ETCDRole && role != services.ControlRole && role != services.WorkerRole {
return nil, fmt.Errorf("Role [%s] for host (%d) is not recognized", role, i+1)
}
}
}
return &kubeCluster, nil
}

74
cluster/hosts.go Normal file
View File

@@ -0,0 +1,74 @@
package cluster
import (
"fmt"
"github.com/Sirupsen/logrus"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
)
func (c *Cluster) TunnelHosts() error {
for i := range c.EtcdHosts {
err := c.EtcdHosts[i].TunnelUp()
if err != nil {
return fmt.Errorf("Failed to set up SSH tunneling for Etcd hosts: %v", err)
}
}
for i := range c.ControlPlaneHosts {
err := c.ControlPlaneHosts[i].TunnelUp()
if err != nil {
return fmt.Errorf("Failed to set up SSH tunneling for Control hosts: %v", err)
}
}
for i := range c.WorkerHosts {
err := c.WorkerHosts[i].TunnelUp()
if err != nil {
return fmt.Errorf("Failed to set up SSH tunneling for Worker hosts: %v", err)
}
}
return nil
}
func (c *Cluster) InvertIndexHosts() error {
c.EtcdHosts = make([]hosts.Host, 0)
c.WorkerHosts = make([]hosts.Host, 0)
c.ControlPlaneHosts = make([]hosts.Host, 0)
for _, host := range c.Hosts {
for _, role := range host.Role {
logrus.Debugf("Host: " + host.Hostname + " has role: " + role)
switch role {
case services.ETCDRole:
c.EtcdHosts = append(c.EtcdHosts, host)
case services.ControlRole:
c.ControlPlaneHosts = append(c.ControlPlaneHosts, host)
case services.WorkerRole:
c.WorkerHosts = append(c.WorkerHosts, host)
default:
return fmt.Errorf("Failed to recognize host [%s] role %s", host.Hostname, role)
}
}
}
return nil
}
func (c *Cluster) SetUpHosts(authType string) error {
if authType == X509AuthenticationProvider {
logrus.Infof("[certificates] Deploying kubernetes certificates to Cluster nodes")
err := pki.DeployCertificatesOnMasters(c.ControlPlaneHosts, c.Certificates)
if err != nil {
return err
}
err = pki.DeployCertificatesOnWorkers(c.WorkerHosts, c.Certificates)
if err != nil {
return err
}
err = pki.DeployAdminConfig(c.Certificates[pki.KubeAdminCommonName].Config)
if err != nil {
return err
}
logrus.Infof("[certificates] Successfully deployed kubernetes certificates to Cluster nodes")
}
return nil
}

111
cluster/state.go Normal file
View File

@@ -0,0 +1,111 @@
package cluster
import (
"fmt"
"time"
"github.com/Sirupsen/logrus"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/k8s"
"github.com/rancher/rke/pki"
yaml "gopkg.in/yaml.v2"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
)
func (c *Cluster) SaveClusterState(clusterFile string) error {
// Reinitialize kubernetes Client
var err error
c.KClient, err = k8s.NewClient(pki.KubeAdminConfigPath)
if err != nil {
return fmt.Errorf("Failed to re-initialize Kubernetes Client: %v", err)
}
err = saveClusterCerts(c.KClient, c.Certificates)
if err != nil {
return fmt.Errorf("[certificates] Failed to Save Kubernetes certificates: %v", err)
}
err = saveStateToKubernetes(c.KClient, pki.KubeAdminConfigPath, []byte(clusterFile))
if err != nil {
return fmt.Errorf("[state] Failed to save configuration state: %v", err)
}
return nil
}
func (c *Cluster) GetClusterState() (*Cluster, error) {
var err error
var currentCluster *Cluster
c.KClient, err = k8s.NewClient(pki.KubeAdminConfigPath)
if err != nil {
logrus.Warnf("Failed to initiate new Kubernetes Client: %v", err)
} else {
// Handle pervious kubernetes state and certificate generation
currentCluster = getStateFromKubernetes(c.KClient, pki.KubeAdminConfigPath)
if currentCluster != nil {
err = currentCluster.InvertIndexHosts()
if err != nil {
return nil, fmt.Errorf("Failed to classify hosts from fetched cluster: %v", err)
}
err = hosts.ReconcileWorkers(currentCluster.WorkerHosts, c.WorkerHosts, c.KClient)
if err != nil {
return nil, fmt.Errorf("Failed to reconcile hosts: %v", err)
}
}
}
return currentCluster, nil
}
func saveStateToKubernetes(kClient *kubernetes.Clientset, kubeConfigPath string, clusterFile []byte) error {
logrus.Infof("[state] Saving cluster state to Kubernetes")
timeout := make(chan bool, 1)
go func() {
for {
err := k8s.UpdateConfigMap(kClient, clusterFile, StateConfigMapName)
if err != nil {
time.Sleep(time.Second * 5)
continue
}
logrus.Infof("[state] Successfully Saved cluster state to Kubernetes ConfigMap: %s", StateConfigMapName)
timeout <- true
break
}
}()
select {
case <-timeout:
return nil
case <-time.After(time.Second * UpdateStateTimeout):
return fmt.Errorf("[state] Timeout waiting for kubernetes to be ready")
}
}
func getStateFromKubernetes(kClient *kubernetes.Clientset, kubeConfigPath string) *Cluster {
logrus.Infof("[state] Fetching cluster state from Kubernetes")
var cfgMap *v1.ConfigMap
var currentCluster Cluster
var err error
timeout := make(chan bool, 1)
go func() {
for {
cfgMap, err = k8s.GetConfigMap(kClient, StateConfigMapName)
if err != nil {
time.Sleep(time.Second * 5)
continue
}
logrus.Infof("[state] Successfully Fetched cluster state to Kubernetes ConfigMap: %s", StateConfigMapName)
timeout <- true
break
}
}()
select {
case <-timeout:
clusterData := cfgMap.Data[StateConfigMapName]
err := yaml.Unmarshal([]byte(clusterData), &currentCluster)
if err != nil {
return nil
}
return &currentCluster
case <-time.After(time.Second * GetStateTimeout):
logrus.Warnf("Timed out waiting for kubernetes cluster")
return nil
}
return nil
}

View File

@@ -6,26 +6,26 @@ import (
"os"
"path/filepath"
yaml "gopkg.in/yaml.v2"
"github.com/Sirupsen/logrus"
"github.com/rancher/rke/hosts"
"github.com/rancher/rke/cluster"
"github.com/rancher/rke/pki"
"github.com/rancher/rke/services"
"github.com/urfave/cli"
"k8s.io/client-go/util/cert"
)
func ClusterCommand() cli.Command {
clusterUpFlags := []cli.Flag{
cli.StringFlag{
Name: "cluster-file",
Usage: "Specify an alternate cluster YAML file (default: cluster.yml)",
Usage: "Specify an alternate cluster YAML file",
Value: "cluster.yml",
EnvVar: "CLUSTER_FILE",
},
cli.BoolFlag{
Name: "force-crts",
Usage: "Force rotating the Kubernetes components certificates",
cli.StringFlag{
Name: "auth-type",
Usage: "Specify authentication type",
Value: "x509",
EnvVar: "AUTH_TYPE",
},
}
return cli.Command{
@@ -37,55 +37,65 @@ func ClusterCommand() cli.Command {
cli.Command{
Name: "up",
Usage: "Bring the cluster up",
Action: clusterUp,
Action: clusterUpFromCli,
Flags: clusterUpFlags,
},
},
}
}
func clusterUp(ctx *cli.Context) error {
logrus.Infof("Building up Kubernetes cluster")
func ClusterUp(clusterFile, authType string) (string, string, string, string, error) {
logrus.Infof("Building Kubernetes cluster")
var ApiURL, caCrt, clientCert, clientKey string
kubeCluster, err := cluster.ParseConfig(clusterFile)
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.TunnelHosts()
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
}
currentCluster, err := kubeCluster.GetClusterState()
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
}
err = cluster.SetUpAuthentication(kubeCluster, currentCluster, authType)
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.SetUpHosts(authType)
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.DeployClusterPlanes()
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
}
err = kubeCluster.SaveClusterState(clusterFile)
if err != nil {
return ApiURL, caCrt, clientCert, clientKey, err
}
ApiURL = fmt.Sprintf("https://" + kubeCluster.ControlPlaneHosts[0].IP + ":6443")
caCrt = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.CACertName].Certificate))
clientCert = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.KubeAdminCommonName].Certificate))
clientKey = string(cert.EncodePrivateKeyPEM(kubeCluster.Certificates[pki.KubeAdminCommonName].Key))
return ApiURL, caCrt, clientCert, clientKey, nil
}
func clusterUpFromCli(ctx *cli.Context) error {
authType := ctx.String("auth-type")
clusterFile, err := resolveClusterFile(ctx)
if err != nil {
return fmt.Errorf("Failed to bring cluster up: %v", err)
return fmt.Errorf("Failed to resolve cluster file: %v", err)
}
logrus.Debugf("Parsing cluster file [%v]", clusterFile)
servicesLookup, k8shosts, err := parseClusterFile(clusterFile)
if err != nil {
return fmt.Errorf("Failed to parse the cluster file: %v", err)
}
for i := range k8shosts {
// Set up socket tunneling
k8shosts[i].TunnelUp(ctx)
defer k8shosts[i].DClient.Close()
if err != nil {
_, _, _, _, err = ClusterUp(clusterFile, authType)
return err
}
}
etcdHosts, cpHosts, workerHosts := hosts.DivideHosts(k8shosts)
KubernetesServiceIP, err := services.GetKubernetesServiceIp(servicesLookup.Services.KubeAPI.ServiceClusterIPRange)
clusterDomain := servicesLookup.Services.Kubelet.ClusterDomain
if err != nil {
return err
}
err = pki.StartCertificatesGeneration(ctx, cpHosts, workerHosts, clusterDomain, KubernetesServiceIP)
if err != nil {
return fmt.Errorf("[Certificates] Failed to generate Kubernetes certificates: %v", err)
}
err = services.RunEtcdPlane(etcdHosts, servicesLookup.Services.Etcd)
if err != nil {
return fmt.Errorf("[Etcd] Failed to bring up Etcd Plane: %v", err)
}
err = services.RunControlPlane(cpHosts, etcdHosts, servicesLookup.Services)
if err != nil {
return fmt.Errorf("[ControlPlane] Failed to bring up Control Plane: %v", err)
}
err = services.RunWorkerPlane(cpHosts, workerHosts, servicesLookup.Services)
if err != nil {
return fmt.Errorf("[WorkerPlane] Failed to bring up Worker Plane: %v", err)
}
return nil
}
func resolveClusterFile(ctx *cli.Context) (string, error) {
@@ -107,38 +117,3 @@ func resolveClusterFile(ctx *cli.Context) (string, error) {
return clusterFile, nil
}
func parseClusterFile(clusterFile string) (*services.Container, []hosts.Host, error) {
// parse hosts
k8shosts := hosts.Hosts{}
err := yaml.Unmarshal([]byte(clusterFile), &k8shosts)
if err != nil {
return nil, nil, err
}
for i, host := range k8shosts.Hosts {
if len(host.Hostname) == 0 {
return nil, nil, fmt.Errorf("Hostname for host (%d) is not provided", i+1)
} else if len(host.User) == 0 {
return nil, nil, fmt.Errorf("User for host (%d) is not provided", i+1)
} else if len(host.Role) == 0 {
return nil, nil, fmt.Errorf("Role for host (%d) is not provided", i+1)
} else if host.AdvertiseAddress == "" {
// if control_plane_ip is not set,
// default to the main IP
k8shosts.Hosts[i].AdvertiseAddress = host.IP
}
for _, role := range host.Role {
if role != services.ETCDRole && role != services.ControlRole && role != services.WorkerRole {
return nil, nil, fmt.Errorf("Role [%s] for host (%d) is not recognized", role, i+1)
}
}
}
// parse services
var servicesContainer services.Container
err = yaml.Unmarshal([]byte(clusterFile), &servicesContainer)
if err != nil {
return nil, nil, err
}
return &servicesContainer, k8shosts.Hosts, nil
}

View File

@@ -10,41 +10,41 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/rancher/rke/hosts"
"github.com/docker/docker/client"
)
func DoRunContainer(imageCfg *container.Config, hostCfg *container.HostConfig, containerName string, host *hosts.Host, plane string) error {
isRunning, err := IsContainerRunning(host, containerName)
func DoRunContainer(dClient *client.Client, imageCfg *container.Config, hostCfg *container.HostConfig, containerName string, hostname string, plane string) error {
isRunning, err := IsContainerRunning(dClient, hostname, containerName)
if err != nil {
return err
}
if isRunning {
logrus.Infof("[%s] Container %s is already running on host [%s]", plane, containerName, host.Hostname)
logrus.Infof("[%s] Container %s is already running on host [%s]", plane, containerName, hostname)
return nil
}
logrus.Debugf("[%s] Pulling Image on host [%s]", plane, host.Hostname)
err = PullImage(host, imageCfg.Image)
logrus.Debugf("[%s] Pulling Image on host [%s]", plane, hostname)
err = PullImage(dClient, hostname, imageCfg.Image)
if err != nil {
return err
}
logrus.Infof("[%s] Successfully pulled %s image on host [%s]", plane, containerName, host.Hostname)
resp, err := host.DClient.ContainerCreate(context.Background(), imageCfg, hostCfg, nil, containerName)
logrus.Infof("[%s] Successfully pulled %s image on host [%s]", plane, containerName, hostname)
resp, err := dClient.ContainerCreate(context.Background(), imageCfg, hostCfg, nil, containerName)
if err != nil {
return fmt.Errorf("Failed to create %s container on host [%s]: %v", containerName, host.Hostname, err)
return fmt.Errorf("Failed to create %s container on host [%s]: %v", containerName, hostname, err)
}
if err := host.DClient.ContainerStart(context.Background(), resp.ID, types.ContainerStartOptions{}); err != nil {
return fmt.Errorf("Failed to start %s container on host [%s]: %v", containerName, host.Hostname, err)
if err := dClient.ContainerStart(context.Background(), resp.ID, types.ContainerStartOptions{}); err != nil {
return fmt.Errorf("Failed to start %s container on host [%s]: %v", containerName, hostname, err)
}
logrus.Debugf("[%s] Successfully started %s container: %s", plane, containerName, resp.ID)
logrus.Infof("[%s] Successfully started %s container on host [%s]", plane, containerName, host.Hostname)
logrus.Infof("[%s] Successfully started %s container on host [%s]", plane, containerName, hostname)
return nil
}
func IsContainerRunning(host *hosts.Host, containerName string) (bool, error) {
logrus.Debugf("Checking if container %s is running on host [%s]", containerName, host.Hostname)
containers, err := host.DClient.ContainerList(context.Background(), types.ContainerListOptions{})
func IsContainerRunning(dClient *client.Client, hostname string, containerName string) (bool, error) {
logrus.Debugf("Checking if container %s is running on host [%s]", containerName, hostname)
containers, err := dClient.ContainerList(context.Background(), types.ContainerListOptions{})
if err != nil {
return false, fmt.Errorf("Can't get Docker containers for host [%s]: %v", host.Hostname, err)
return false, fmt.Errorf("Can't get Docker containers for host [%s]: %v", hostname, err)
}
for _, container := range containers {
@@ -55,10 +55,10 @@ func IsContainerRunning(host *hosts.Host, containerName string) (bool, error) {
return false, nil
}
func PullImage(host *hosts.Host, containerImage string) error {
out, err := host.DClient.ImagePull(context.Background(), containerImage, types.ImagePullOptions{})
func PullImage(dClient *client.Client, hostname string, containerImage string) error {
out, err := dClient.ImagePull(context.Background(), containerImage, types.ImagePullOptions{})
if err != nil {
return fmt.Errorf("Can't pull Docker image %s for host [%s]: %v", containerImage, host.Hostname, err)
return fmt.Errorf("Can't pull Docker image %s for host [%s]: %v", containerImage, hostname, err)
}
defer out.Close()
if logrus.GetLevel() == logrus.DebugLevel {
@@ -69,3 +69,11 @@ func PullImage(host *hosts.Host, containerImage string) error {
return nil
}
func RemoveContainer(dClient *client.Client, hostname string, containerName string) error {
err := dClient.ContainerRemove(context.Background(), containerName, types.ContainerRemoveOptions{})
if err != nil {
return fmt.Errorf("Can't remove Docker container %s for host [%s]: %v", containerName, hostname, err)
}
return nil
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/client"
"github.com/urfave/cli"
"golang.org/x/crypto/ssh"
)
@@ -43,7 +42,7 @@ func (d *dialer) Dial(network, addr string) (net.Conn, error) {
return remote, err
}
func (h *Host) TunnelUp(ctx *cli.Context) error {
func (h *Host) TunnelUp() error {
logrus.Infof("[ssh] Start tunnel for host [%s]", h.Hostname)
dialer := &dialer{

View File

@@ -3,12 +3,10 @@ package hosts
import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/client"
"github.com/rancher/rke/k8s"
"k8s.io/client-go/kubernetes"
)
type Hosts struct {
Hosts []Host `yaml:"hosts"`
}
type Host struct {
IP string `yaml:"ip"`
AdvertiseAddress string `yaml:"advertise_address"`
@@ -19,23 +17,29 @@ type Host struct {
DClient *client.Client
}
func DivideHosts(hosts []Host) ([]Host, []Host, []Host) {
etcdHosts := []Host{}
cpHosts := []Host{}
workerHosts := []Host{}
for _, host := range hosts {
for _, role := range host.Role {
logrus.Debugf("Host: " + host.Hostname + " has role: " + role)
if role == "etcd" {
etcdHosts = append(etcdHosts, host)
func ReconcileWorkers(currentWorkers []Host, newWorkers []Host, kClient *kubernetes.Clientset) error {
for _, currentWorker := range currentWorkers {
found := false
for _, newWorker := range newWorkers {
if currentWorker.Hostname == newWorker.Hostname {
found = true
}
if role == "controlplane" {
cpHosts = append(cpHosts, host)
}
if role == "worker" {
workerHosts = append(workerHosts, host)
if !found {
if err := deleteWorkerNode(&currentWorker, kClient); err != nil {
return err
}
}
}
return etcdHosts, cpHosts, workerHosts
return nil
}
func deleteWorkerNode(workerNode *Host, kClient *kubernetes.Clientset) error {
logrus.Infof("[hosts] Deleting host [%s] from the cluster", workerNode.Hostname)
err := k8s.DeleteNode(kClient, workerNode.Hostname)
if err != nil {
return err
}
logrus.Infof("[hosts] Successfully deleted host [%s] from the cluster", workerNode.Hostname)
return nil
}

90
k8s/k8s.go Normal file
View File

@@ -0,0 +1,90 @@
package k8s
import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/clientcmd"
)
func NewClient(kubeConfigPath string) (*kubernetes.Clientset, error) {
// use the current admin kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err != nil {
return nil, err
}
K8sClientSet, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return K8sClientSet, nil
}
func UpdateConfigMap(k8sClient *kubernetes.Clientset, configYaml []byte, configMapName string) error {
cfgMap := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
Namespace: metav1.NamespaceSystem,
},
Data: map[string]string{
configMapName: string(configYaml),
},
}
if _, err := k8sClient.ConfigMaps(metav1.NamespaceSystem).Create(cfgMap); err != nil {
if !apierrors.IsAlreadyExists(err) {
return err
}
if _, err := k8sClient.ConfigMaps(metav1.NamespaceSystem).Update(cfgMap); err != nil {
return err
}
}
return nil
}
func GetConfigMap(k8sClient *kubernetes.Clientset, configMapName string) (*v1.ConfigMap, error) {
return k8sClient.ConfigMaps(metav1.NamespaceSystem).Get(configMapName, metav1.GetOptions{})
}
func UpdateSecret(k8sClient *kubernetes.Clientset, fieldName string, secretData []byte, secretName string) error {
secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretName,
Namespace: metav1.NamespaceSystem,
},
Data: map[string][]byte{
fieldName: secretData,
},
}
if _, err := k8sClient.Secrets(metav1.NamespaceSystem).Create(secret); err != nil {
if !apierrors.IsAlreadyExists(err) {
return err
}
// update secret if its already exist
oldSecret, err := k8sClient.Secrets(metav1.NamespaceSystem).Get(secretName, metav1.GetOptions{})
if err != nil {
return err
}
newData := oldSecret.Data
newData[fieldName] = secretData
secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretName,
Namespace: metav1.NamespaceSystem,
},
Data: newData,
}
if _, err := k8sClient.Secrets(metav1.NamespaceSystem).Update(secret); err != nil {
return err
}
}
return nil
}
func GetSecret(k8sClient *kubernetes.Clientset, secretName string) (*v1.Secret, error) {
return k8sClient.Secrets(metav1.NamespaceSystem).Get(secretName, metav1.GetOptions{})
}
func DeleteNode(k8sClient *kubernetes.Clientset, nodeName string) error {
return k8sClient.Nodes().Delete(nodeName, &metav1.DeleteOptions{})
}

View File

@@ -11,6 +11,12 @@ import (
var VERSION = "v0.1.0-dev"
func main() {
if err := mainErr(); err != nil {
logrus.Fatal(err)
}
}
func mainErr() error {
app := cli.NewApp()
app.Name = "rke"
app.Version = VERSION
@@ -32,5 +38,5 @@ func main() {
Usage: "Debug logging",
},
}
app.Run(os.Args)
return app.Run(os.Args)
}

View File

@@ -3,6 +3,7 @@ package pki
const (
CrtDownloaderImage = "husseingalal/crt-downloader:latest"
CrtDownloaderContainer = "cert-deployer"
CertificatesSecretName = "k8s-certs"
CACertName = "kube-ca"
CACertENVName = "KUBE_CA"

View File

@@ -6,7 +6,6 @@ import (
"crypto/x509"
"fmt"
"io/ioutil"
"os"
"time"
"github.com/Sirupsen/logrus"
@@ -19,44 +18,39 @@ import (
func convertCrtToENV(name string, certificate *x509.Certificate) string {
encodedCrt := cert.EncodeCertPEM(certificate)
return name + "=" + string(encodedCrt)
return fmt.Sprintf("%s=%s", name, string(encodedCrt))
}
func convertKeyToENV(name string, key *rsa.PrivateKey) string {
encodedKey := cert.EncodePrivateKeyPEM(key)
return name + "=" + string(encodedKey)
return fmt.Sprintf("%s=%s", name, string(encodedKey))
}
func convertConfigToENV(name string, config string) string {
return name + "=" + config
return fmt.Sprintf("%s=%s", name, config)
}
func deployCertificatesOnMasters(cpHosts []hosts.Host, crtMap map[string]CertificatePKI, forceDeploy bool) error {
forceDeployEnv := "FORCE_DEPLOY=false"
if forceDeploy {
forceDeployEnv = "FORCE_DEPLOY=true"
}
func DeployCertificatesOnMasters(cpHosts []hosts.Host, crtMap map[string]CertificatePKI) error {
env := []string{
forceDeployEnv,
convertCrtToENV(CACertENVName, crtMap[CACertName].certificate),
convertKeyToENV(CAKeyENVName, crtMap[CACertName].key),
convertCrtToENV(KubeAPICertENVName, crtMap[KubeAPICertName].certificate),
convertKeyToENV(KubeAPIKeyENVName, crtMap[KubeAPICertName].key),
convertCrtToENV(KubeControllerCertENVName, crtMap[KubeControllerName].certificate),
convertKeyToENV(KubeControllerKeyENVName, crtMap[KubeControllerName].key),
convertConfigToENV(KubeControllerConfigENVName, crtMap[KubeControllerName].config),
convertCrtToENV(KubeSchedulerCertENVName, crtMap[KubeSchedulerName].certificate),
convertKeyToENV(KubeSchedulerKeyENVName, crtMap[KubeSchedulerName].key),
convertConfigToENV(KubeSchedulerConfigENVName, crtMap[KubeSchedulerName].config),
convertCrtToENV(KubeProxyCertENVName, crtMap[KubeProxyName].certificate),
convertKeyToENV(KubeProxyKeyENVName, crtMap[KubeProxyName].key),
convertConfigToENV(KubeProxyConfigENVName, crtMap[KubeProxyName].config),
convertCrtToENV(KubeNodeCertENVName, crtMap[KubeNodeName].certificate),
convertKeyToENV(KubeNodeKeyENVName, crtMap[KubeNodeName].key),
convertConfigToENV(KubeNodeConfigENVName, crtMap[KubeNodeName].config),
convertCrtToENV(CACertENVName, crtMap[CACertName].Certificate),
convertKeyToENV(CAKeyENVName, crtMap[CACertName].Key),
convertCrtToENV(KubeAPICertENVName, crtMap[KubeAPICertName].Certificate),
convertKeyToENV(KubeAPIKeyENVName, crtMap[KubeAPICertName].Key),
convertCrtToENV(KubeControllerCertENVName, crtMap[KubeControllerName].Certificate),
convertKeyToENV(KubeControllerKeyENVName, crtMap[KubeControllerName].Key),
convertConfigToENV(KubeControllerConfigENVName, crtMap[KubeControllerName].Config),
convertCrtToENV(KubeSchedulerCertENVName, crtMap[KubeSchedulerName].Certificate),
convertKeyToENV(KubeSchedulerKeyENVName, crtMap[KubeSchedulerName].Key),
convertConfigToENV(KubeSchedulerConfigENVName, crtMap[KubeSchedulerName].Config),
convertCrtToENV(KubeProxyCertENVName, crtMap[KubeProxyName].Certificate),
convertKeyToENV(KubeProxyKeyENVName, crtMap[KubeProxyName].Key),
convertConfigToENV(KubeProxyConfigENVName, crtMap[KubeProxyName].Config),
convertCrtToENV(KubeNodeCertENVName, crtMap[KubeNodeName].Certificate),
convertKeyToENV(KubeNodeKeyENVName, crtMap[KubeNodeName].Key),
convertConfigToENV(KubeNodeConfigENVName, crtMap[KubeNodeName].Config),
}
for _, host := range cpHosts {
err := doRunDeployer(&host, env)
for i := range cpHosts {
err := doRunDeployer(&cpHosts[i], env)
if err != nil {
return err
}
@@ -64,23 +58,18 @@ func deployCertificatesOnMasters(cpHosts []hosts.Host, crtMap map[string]Certifi
return nil
}
func deployCertificatesOnWorkers(workerHosts []hosts.Host, crtMap map[string]CertificatePKI, forceDeploy bool) error {
forceDeployEnv := "FORCE_DEPLOY=false"
if forceDeploy {
forceDeployEnv = "FORCE_DEPLOY=true"
}
func DeployCertificatesOnWorkers(workerHosts []hosts.Host, crtMap map[string]CertificatePKI) error {
env := []string{
forceDeployEnv,
convertCrtToENV(CACertENVName, crtMap[CACertName].certificate),
convertCrtToENV(KubeProxyCertENVName, crtMap[KubeProxyName].certificate),
convertKeyToENV(KubeProxyKeyENVName, crtMap[KubeProxyName].key),
convertConfigToENV(KubeProxyConfigENVName, crtMap[KubeProxyName].config),
convertCrtToENV(KubeNodeCertENVName, crtMap[KubeNodeName].certificate),
convertKeyToENV(KubeNodeKeyENVName, crtMap[KubeNodeName].key),
convertConfigToENV(KubeNodeConfigENVName, crtMap[KubeNodeName].config),
convertCrtToENV(CACertENVName, crtMap[CACertName].Certificate),
convertCrtToENV(KubeProxyCertENVName, crtMap[KubeProxyName].Certificate),
convertKeyToENV(KubeProxyKeyENVName, crtMap[KubeProxyName].Key),
convertConfigToENV(KubeProxyConfigENVName, crtMap[KubeProxyName].Config),
convertCrtToENV(KubeNodeCertENVName, crtMap[KubeNodeName].Certificate),
convertKeyToENV(KubeNodeKeyENVName, crtMap[KubeNodeName].Key),
convertConfigToENV(KubeNodeConfigENVName, crtMap[KubeNodeName].Config),
}
for _, host := range workerHosts {
err := doRunDeployer(&host, env)
for i := range workerHosts {
err := doRunDeployer(&workerHosts[i], env)
if err != nil {
return err
}
@@ -90,7 +79,7 @@ func deployCertificatesOnWorkers(workerHosts []hosts.Host, crtMap map[string]Cer
func doRunDeployer(host *hosts.Host, containerEnv []string) error {
logrus.Debugf("[certificates] Pulling Certificate downloader Image on host [%s]", host.Hostname)
err := docker.PullImage(host, CrtDownloaderImage)
err := docker.PullImage(host.DClient, host.Hostname, CrtDownloaderImage)
if err != nil {
return err
}
@@ -115,7 +104,7 @@ func doRunDeployer(host *hosts.Host, containerEnv []string) error {
}
logrus.Debugf("[certificates] Successfully started Certificate deployer container: %s", resp.ID)
for {
isDeployerRunning, err := docker.IsContainerRunning(host, CrtDownloaderContainer)
isDeployerRunning, err := docker.IsContainerRunning(host.DClient, host.Hostname, CrtDownloaderContainer)
if err != nil {
return err
}
@@ -130,13 +119,11 @@ func doRunDeployer(host *hosts.Host, containerEnv []string) error {
}
}
func deployAdminConfig(kubeConfig string, forceDeploy bool) error {
func DeployAdminConfig(kubeConfig string) error {
logrus.Debugf("Deploying admin Kubeconfig locally: %s", kubeConfig)
if _, err := os.Stat(KubeAdminConfigPath); os.IsNotExist(err) || forceDeploy {
err := ioutil.WriteFile(KubeAdminConfigPath, []byte(kubeConfig), 0644)
if err != nil {
return fmt.Errorf("Failed to create local admin kubeconfig file: %v", err)
}
}
return nil
}

View File

@@ -8,38 +8,23 @@ import (
"github.com/Sirupsen/logrus"
"github.com/rancher/rke/hosts"
"github.com/urfave/cli"
"k8s.io/client-go/util/cert"
)
type CertificatePKI struct {
certificate *x509.Certificate
key *rsa.PrivateKey
config string
Certificate *x509.Certificate
Key *rsa.PrivateKey
Config string
}
// StartCertificatesGeneration ...
func StartCertificatesGeneration(ctx *cli.Context, cpHosts []hosts.Host, workerHosts []hosts.Host, clusterDomain string, KubernetesServiceIP net.IP) error {
forceDeploy := ctx.Bool("force-crts")
func StartCertificatesGeneration(cpHosts []hosts.Host, workerHosts []hosts.Host, clusterDomain string, KubernetesServiceIP net.IP) (map[string]CertificatePKI, error) {
logrus.Infof("[certificates] Generating kubernetes certificates")
certs, err := generateCerts(cpHosts, clusterDomain, KubernetesServiceIP)
if err != nil {
return err
return nil, err
}
logrus.Infof("[certificates] Generating admin certificates and kubeconfig")
err = generateAdminCerts(certs, clusterDomain, cpHosts, forceDeploy)
if err != nil {
return err
}
err = deployCertificatesOnMasters(cpHosts, certs, forceDeploy)
if err != nil {
return err
}
err = deployCertificatesOnWorkers(workerHosts, certs, forceDeploy)
if err != nil {
return err
}
return nil
return certs, nil
}
func generateCerts(cpHosts []hosts.Host, clusterDomain string, KubernetesServiceIP net.IP) (map[string]CertificatePKI, error) {
@@ -52,8 +37,8 @@ func generateCerts(cpHosts []hosts.Host, clusterDomain string, KubernetesService
}
logrus.Debugf("[certificates] CA Certificate: %s", string(cert.EncodeCertPEM(caCrt)))
certs[CACertName] = CertificatePKI{
certificate: caCrt,
key: caKey,
Certificate: caCrt,
Key: caKey,
}
// generate API certificate and key
@@ -65,8 +50,8 @@ func generateCerts(cpHosts []hosts.Host, clusterDomain string, KubernetesService
}
logrus.Debugf("[certificates] Kube API Certificate: %s", string(cert.EncodeCertPEM(kubeAPICrt)))
certs[KubeAPICertName] = CertificatePKI{
certificate: kubeAPICrt,
key: kubeAPIKey,
Certificate: kubeAPICrt,
Key: kubeAPIKey,
}
// generate Kube controller-manager certificate and key
@@ -77,9 +62,9 @@ func generateCerts(cpHosts []hosts.Host, clusterDomain string, KubernetesService
}
logrus.Debugf("[certificates] Kube Controller Certificate: %s", string(cert.EncodeCertPEM(kubeControllerCrt)))
certs[KubeControllerName] = CertificatePKI{
certificate: kubeControllerCrt,
key: kubeControllerKey,
config: getKubeConfigX509("https://"+cpHosts[0].AdvertiseAddress+":6443", KubeControllerName, CACertPath, KubeControllerCertPath, KubeControllerKeyPath),
Certificate: kubeControllerCrt,
Key: kubeControllerKey,
Config: getKubeConfigX509("https://"+cpHosts[0].AdvertiseAddress+":6443", KubeControllerName, CACertPath, KubeControllerCertPath, KubeControllerKeyPath),
}
// generate Kube scheduler certificate and key
@@ -90,9 +75,9 @@ func generateCerts(cpHosts []hosts.Host, clusterDomain string, KubernetesService
}
logrus.Debugf("[certificates] Kube Scheduler Certificate: %s", string(cert.EncodeCertPEM(kubeSchedulerCrt)))
certs[KubeSchedulerName] = CertificatePKI{
certificate: kubeSchedulerCrt,
key: kubeSchedulerKey,
config: getKubeConfigX509("https://"+cpHosts[0].AdvertiseAddress+":6443", KubeSchedulerName, CACertPath, KubeSchedulerCertPath, KubeSchedulerKeyPath),
Certificate: kubeSchedulerCrt,
Key: kubeSchedulerKey,
Config: getKubeConfigX509("https://"+cpHosts[0].AdvertiseAddress+":6443", KubeSchedulerName, CACertPath, KubeSchedulerCertPath, KubeSchedulerKeyPath),
}
// generate Kube Proxy certificate and key
@@ -103,9 +88,9 @@ func generateCerts(cpHosts []hosts.Host, clusterDomain string, KubernetesService
}
logrus.Debugf("[certificates] Kube Proxy Certificate: %s", string(cert.EncodeCertPEM(kubeProxyCrt)))
certs[KubeProxyName] = CertificatePKI{
certificate: kubeProxyCrt,
key: kubeProxyKey,
config: getKubeConfigX509("https://"+cpHosts[0].AdvertiseAddress+":6443", KubeProxyName, CACertPath, KubeProxyCertPath, KubeProxyKeyPath),
Certificate: kubeProxyCrt,
Key: kubeProxyKey,
Config: getKubeConfigX509("https://"+cpHosts[0].AdvertiseAddress+":6443", KubeProxyName, CACertPath, KubeProxyCertPath, KubeProxyKeyPath),
}
// generate Kubelet certificate and key
@@ -116,30 +101,27 @@ func generateCerts(cpHosts []hosts.Host, clusterDomain string, KubernetesService
}
logrus.Debugf("[certificates] Node Certificate: %s", string(cert.EncodeCertPEM(kubeProxyCrt)))
certs[KubeNodeName] = CertificatePKI{
certificate: nodeCrt,
key: nodeKey,
config: getKubeConfigX509("https://"+cpHosts[0].AdvertiseAddress+":6443", KubeNodeName, CACertPath, KubeNodeCertPath, KubeNodeKeyPath),
Certificate: nodeCrt,
Key: nodeKey,
Config: getKubeConfigX509("https://"+cpHosts[0].AdvertiseAddress+":6443", KubeNodeName, CACertPath, KubeNodeCertPath, KubeNodeKeyPath),
}
return certs, nil
}
func generateAdminCerts(certs map[string]CertificatePKI, clusterDomain string, cpHosts []hosts.Host, forceDeploy bool) error {
// generate API certificate and key
caCrt, caKey := certs[CACertName].certificate, certs[CACertName].key
logrus.Infof("[certificates] Generating admin certificates and kubeconfig")
kubeAdminCrt, kubeAdminKey, err := generateClientCertAndKey(caCrt, caKey, KubeAdminCommonName, []string{KubeAdminOrganizationName})
if err != nil {
return err
return nil, err
}
logrus.Debugf("[certificates] Kube Admin Certificate: %s", string(cert.EncodeCertPEM(kubeAdminCrt)))
kubeAdminConfig := getKubeConfigX509WithData(
logrus.Debugf("[certificates] Admin Certificate: %s", string(cert.EncodeCertPEM(kubeAdminCrt)))
certs[KubeAdminCommonName] = CertificatePKI{
Certificate: kubeAdminCrt,
Key: kubeAdminKey,
Config: getKubeConfigX509WithData(
"https://"+cpHosts[0].IP+":6443",
KubeAdminCommonName,
string(cert.EncodeCertPEM(caCrt)),
string(cert.EncodeCertPEM(kubeAdminCrt)),
string(cert.EncodePrivateKeyPEM(kubeAdminKey)))
err = deployAdminConfig(kubeAdminConfig, forceDeploy)
return err
string(cert.EncodePrivateKeyPEM(kubeAdminKey))),
}
return certs, nil
}
func generateClientCertAndKey(caCrt *x509.Certificate, caKey *rsa.PrivateKey, commonName string, orgs []string) (*x509.Certificate, *rsa.PrivateKey, error) {

View File

@@ -8,16 +8,11 @@ import (
"github.com/rancher/rke/hosts"
)
type Etcd struct {
Version string `yaml:"version"`
Image string `yaml:"image"`
}
func RunEtcdPlane(etcdHosts []hosts.Host, etcdService Etcd) error {
logrus.Infof("[%s] Building up Etcd Plane..", ETCDRole)
for _, host := range etcdHosts {
imageCfg, hostCfg := buildEtcdConfig(host, etcdService)
err := docker.DoRunContainer(imageCfg, hostCfg, EtcdContainerName, &host, ETCDRole)
err := docker.DoRunContainer(host.DClient, imageCfg, hostCfg, EtcdContainerName, host.Hostname, ETCDRole)
if err != nil {
return err
}
@@ -28,7 +23,7 @@ func RunEtcdPlane(etcdHosts []hosts.Host, etcdService Etcd) error {
func buildEtcdConfig(host hosts.Host, etcdService Etcd) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: etcdService.Image + ":" + etcdService.Version,
Image: etcdService.Image,
Cmd: []string{"/usr/local/bin/etcd",
"--name=etcd-" + host.Hostname,
"--data-dir=/etcd-data",

View File

@@ -8,25 +8,15 @@ import (
"github.com/rancher/rke/pki"
)
type KubeAPI struct {
Version string `yaml:"version"`
Image string `yaml:"image"`
ServiceClusterIPRange string `yaml:"service_cluster_ip_range"`
}
func runKubeAPI(host hosts.Host, etcdHosts []hosts.Host, kubeAPIService KubeAPI) error {
etcdConnString := getEtcdConnString(etcdHosts)
imageCfg, hostCfg := buildKubeAPIConfig(host, kubeAPIService, etcdConnString)
err := docker.DoRunContainer(imageCfg, hostCfg, KubeAPIContainerName, &host, ControlRole)
if err != nil {
return err
}
return nil
return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeAPIContainerName, host.Hostname, ControlRole)
}
func buildKubeAPIConfig(host hosts.Host, kubeAPIService KubeAPI, etcdConnString string) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: kubeAPIService.Image + ":" + kubeAPIService.Version,
Image: kubeAPIService.Image,
Cmd: []string{"/hyperkube",
"apiserver",
"--insecure-bind-address=0.0.0.0",

View File

@@ -7,25 +7,14 @@ import (
"github.com/rancher/rke/pki"
)
type KubeController struct {
Version string `yaml:"version"`
Image string `yaml:"image"`
ClusterCIDR string `yaml:"cluster_cidr"`
ServiceClusterIPRange string `yaml:"service_cluster_ip_range"`
}
func runKubeController(host hosts.Host, kubeControllerService KubeController) error {
imageCfg, hostCfg := buildKubeControllerConfig(kubeControllerService)
err := docker.DoRunContainer(imageCfg, hostCfg, KubeControllerContainerName, &host, ControlRole)
if err != nil {
return err
}
return nil
return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeControllerContainerName, host.Hostname, ControlRole)
}
func buildKubeControllerConfig(kubeControllerService KubeController) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: kubeControllerService.Image + ":" + kubeControllerService.Version,
Image: kubeControllerService.Image,
Cmd: []string{"/hyperkube",
"controller-manager",
"--address=0.0.0.0",

View File

@@ -8,25 +8,14 @@ import (
"github.com/rancher/rke/pki"
)
type Kubelet struct {
Version string `yaml:"version"`
Image string `yaml:"image"`
ClusterDomain string `yaml:"cluster_domain"`
InfraContainerImage string `yaml:"infra_container_image"`
}
func runKubelet(host hosts.Host, kubeletService Kubelet, isMaster bool) error {
imageCfg, hostCfg := buildKubeletConfig(host, kubeletService, isMaster)
err := docker.DoRunContainer(imageCfg, hostCfg, KubeletContainerName, &host, WorkerRole)
if err != nil {
return err
}
return nil
return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeletContainerName, host.Hostname, WorkerRole)
}
func buildKubeletConfig(host hosts.Host, kubeletService Kubelet, isMaster bool) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: kubeletService.Image + ":" + kubeletService.Version,
Image: kubeletService.Image,
Cmd: []string{"/hyperkube",
"kubelet",
"--v=2",

View File

@@ -7,23 +7,14 @@ import (
"github.com/rancher/rke/pki"
)
type Kubeproxy struct {
Version string `yaml:"version"`
Image string `yaml:"image"`
}
func runKubeproxy(host hosts.Host, kubeproxyService Kubeproxy) error {
imageCfg, hostCfg := buildKubeproxyConfig(host, kubeproxyService)
err := docker.DoRunContainer(imageCfg, hostCfg, KubeproxyContainerName, &host, WorkerRole)
if err != nil {
return err
}
return nil
return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, KubeproxyContainerName, host.Hostname, WorkerRole)
}
func buildKubeproxyConfig(host hosts.Host, kubeproxyService Kubeproxy) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: kubeproxyService.Image + ":" + kubeproxyService.Version,
Image: kubeproxyService.Image,
Cmd: []string{"/hyperkube",
"proxy",
"--v=2",

View File

@@ -7,23 +7,14 @@ import (
"github.com/rancher/rke/pki"
)
type Scheduler struct {
Version string `yaml:"version"`
Image string `yaml:"image"`
}
func runScheduler(host hosts.Host, schedulerService Scheduler) error {
imageCfg, hostCfg := buildSchedulerConfig(host, schedulerService)
err := docker.DoRunContainer(imageCfg, hostCfg, SchedulerContainerName, &host, ControlRole)
if err != nil {
return err
}
return nil
return docker.DoRunContainer(host.DClient, imageCfg, hostCfg, SchedulerContainerName, host.Hostname, ControlRole)
}
func buildSchedulerConfig(host hosts.Host, schedulerService Scheduler) (*container.Config, *container.HostConfig) {
imageCfg := &container.Config{
Image: schedulerService.Image + ":" + schedulerService.Version,
Image: schedulerService.Image,
Cmd: []string{"/hyperkube",
"scheduler",
"--v=2",

View File

@@ -5,19 +5,6 @@ import (
"net"
)
type Container struct {
Services Services `yaml:"services"`
}
type Services struct {
Etcd Etcd `yaml:"etcd"`
KubeAPI KubeAPI `yaml:"kube-api"`
KubeController KubeController `yaml:"kube-controller"`
Scheduler Scheduler `yaml:"scheduler"`
Kubelet Kubelet `yaml:"kubelet"`
Kubeproxy Kubeproxy `yaml:"kubeproxy"`
}
const (
ETCDRole = "etcd"
ControlRole = "controlplane"

39
services/types.go Normal file
View File

@@ -0,0 +1,39 @@
package services
type Services struct {
Etcd Etcd `yaml:"etcd"`
KubeAPI KubeAPI `yaml:"kube-api"`
KubeController KubeController `yaml:"kube-controller"`
Scheduler Scheduler `yaml:"scheduler"`
Kubelet Kubelet `yaml:"kubelet"`
Kubeproxy Kubeproxy `yaml:"kubeproxy"`
}
type Etcd struct {
Image string `yaml:"image"`
}
type KubeAPI struct {
Image string `yaml:"image"`
ServiceClusterIPRange string `yaml:"service_cluster_ip_range"`
}
type KubeController struct {
Image string `yaml:"image"`
ClusterCIDR string `yaml:"cluster_cidr"`
ServiceClusterIPRange string `yaml:"service_cluster_ip_range"`
}
type Kubelet struct {
Image string `yaml:"image"`
ClusterDomain string `yaml:"cluster_domain"`
InfraContainerImage string `yaml:"infra_container_image"`
}
type Kubeproxy struct {
Image string `yaml:"image"`
}
type Scheduler struct {
Image string `yaml:"image"`
}

View File

@@ -9,7 +9,7 @@ github.com/docker/docker ecf4125b85e0faa57d2739348e0d453c1d24d10c
github.com/docker/distribution 3800056b8832cf6075e78b282ac010131d8687bc
github.com/docker/go-connections 3ede32e2033de7505e6500d6c868c2b9ed9f169d
github.com/docker/go-units 0dadbb0345b35ec7ef35e228dabb8de89a65bf52
golang.org/x/net c73622c77280266305273cb545f54516ced95b93
golang.org/x/net 186fd3fc8194a5e9980a82230d69c1ff7134229f
github.com/opencontainers/go-digest 279bed98673dd5bef374d3b6e4b09e2af76183bf
github.com/gogo/protobuf 117892bf1866fbaa2318c03e50e40564c8845457
github.com/opencontainers/image-spec 7c889fafd04a893f5c5f50b7ab9963d5d64e5242