mirror of
https://github.com/rancher/rke.git
synced 2025-08-22 08:35:49 +00:00
Merge pull request #19 from galal-hussein/add_remove
Add/Remove Cluster plane and Worker hosts
This commit is contained in:
commit
5dfe2a0fc8
@ -6,12 +6,14 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"github.com/rancher/rke/hosts"
|
"github.com/rancher/rke/hosts"
|
||||||
|
"github.com/rancher/rke/k8s"
|
||||||
"github.com/rancher/rke/pki"
|
"github.com/rancher/rke/pki"
|
||||||
"github.com/rancher/rke/services"
|
"github.com/rancher/rke/services"
|
||||||
"github.com/rancher/types/apis/cluster.cattle.io/v1"
|
"github.com/rancher/types/apis/cluster.cattle.io/v1"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
yaml "gopkg.in/yaml.v2"
|
yaml "gopkg.in/yaml.v2"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/client-go/util/cert"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Cluster struct {
|
type Cluster struct {
|
||||||
@ -115,3 +117,61 @@ func GetLocalKubeConfig(configPath string) string {
|
|||||||
baseDir += "/"
|
baseDir += "/"
|
||||||
return fmt.Sprintf("%s%s%s", baseDir, pki.KubeAdminConfigPrefix, fileName)
|
return fmt.Sprintf("%s%s%s", baseDir, pki.KubeAdminConfigPrefix, fileName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ReconcileCluster(kubeCluster, currentCluster *Cluster) error {
|
||||||
|
logrus.Infof("[reconcile] Reconciling cluster state")
|
||||||
|
if currentCluster == nil {
|
||||||
|
logrus.Infof("[reconcile] This is newly generated cluster")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := rebuildLocalAdminConfig(kubeCluster); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
kubeClient, err := k8s.NewClient(kubeCluster.LocalKubeConfigPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to initialize new kubernetes client: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.Infof("[reconcile] Check Control plane hosts to be deleted")
|
||||||
|
cpToDelete := hosts.GetToDeleteHosts(currentCluster.ControlPlaneHosts, kubeCluster.ControlPlaneHosts)
|
||||||
|
for _, toDeleteHost := range cpToDelete {
|
||||||
|
hosts.DeleteNode(&toDeleteHost, kubeClient)
|
||||||
|
}
|
||||||
|
|
||||||
|
logrus.Infof("[reconcile] Check worker hosts to be deleted")
|
||||||
|
wpToDelete := hosts.GetToDeleteHosts(currentCluster.WorkerHosts, kubeCluster.WorkerHosts)
|
||||||
|
for _, toDeleteHost := range wpToDelete {
|
||||||
|
hosts.DeleteNode(&toDeleteHost, kubeClient)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rolling update on change for nginx Proxy
|
||||||
|
cpChanged := hosts.IsHostListChanged(currentCluster.ControlPlaneHosts, kubeCluster.ControlPlaneHosts)
|
||||||
|
if cpChanged {
|
||||||
|
logrus.Infof("[reconcile] Rolling update nginx hosts with new list of control plane hosts")
|
||||||
|
err = services.RollingUpdateNginxProxy(kubeCluster.ControlPlaneHosts, kubeCluster.WorkerHosts)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to rolling update Nginx hosts with new control plane hosts")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logrus.Infof("[reconcile] Reconciled cluster state successfully")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func rebuildLocalAdminConfig(kubeCluster *Cluster) error {
|
||||||
|
logrus.Infof("[reconcile] Rebuilding and update local kube config")
|
||||||
|
currentKubeConfig := kubeCluster.Certificates[pki.KubeAdminCommonName]
|
||||||
|
caCrt := kubeCluster.Certificates[pki.CACertName].Certificate
|
||||||
|
newConfig := pki.GetKubeConfigX509WithData(
|
||||||
|
"https://"+kubeCluster.ControlPlaneHosts[0].IP+":6443",
|
||||||
|
pki.KubeAdminCommonName,
|
||||||
|
string(cert.EncodeCertPEM(caCrt)),
|
||||||
|
string(cert.EncodeCertPEM(currentKubeConfig.Certificate)),
|
||||||
|
string(cert.EncodePrivateKeyPEM(currentKubeConfig.Key)))
|
||||||
|
err := pki.DeployAdminConfig(newConfig, kubeCluster.LocalKubeConfigPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to redeploy local admin config with new host")
|
||||||
|
}
|
||||||
|
currentKubeConfig.Config = newConfig
|
||||||
|
kubeCluster.Certificates[pki.KubeAdminCommonName] = currentKubeConfig
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/rancher/rke/hosts"
|
|
||||||
"github.com/rancher/rke/k8s"
|
"github.com/rancher/rke/k8s"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
yaml "gopkg.in/yaml.v2"
|
yaml "gopkg.in/yaml.v2"
|
||||||
@ -48,10 +47,6 @@ func (c *Cluster) GetClusterState() (*Cluster, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("Failed to classify hosts from fetched cluster: %v", err)
|
return nil, fmt.Errorf("Failed to classify hosts from fetched cluster: %v", err)
|
||||||
}
|
}
|
||||||
err = hosts.ReconcileWorkers(currentCluster.WorkerHosts, c.WorkerHosts, c.KubeClient)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Failed to reconcile hosts: %v", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return currentCluster, nil
|
return currentCluster, nil
|
||||||
|
@ -80,6 +80,7 @@ func ClusterUp(clusterFile string) (string, string, string, string, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return APIURL, caCrt, clientCert, clientKey, err
|
return APIURL, caCrt, clientCert, clientKey, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = kubeCluster.SetUpHosts()
|
err = kubeCluster.SetUpHosts()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return APIURL, caCrt, clientCert, clientKey, err
|
return APIURL, caCrt, clientCert, clientKey, err
|
||||||
@ -90,6 +91,11 @@ func ClusterUp(clusterFile string) (string, string, string, string, error) {
|
|||||||
return APIURL, caCrt, clientCert, clientKey, err
|
return APIURL, caCrt, clientCert, clientKey, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err = cluster.ReconcileCluster(kubeCluster, currentCluster)
|
||||||
|
if err != nil {
|
||||||
|
return APIURL, caCrt, clientCert, clientKey, err
|
||||||
|
}
|
||||||
|
|
||||||
err = kubeCluster.SaveClusterState(clusterFile)
|
err = kubeCluster.SaveClusterState(clusterFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return APIURL, caCrt, clientCert, clientKey, err
|
return APIURL, caCrt, clientCert, clientKey, err
|
||||||
@ -109,6 +115,8 @@ func ClusterUp(clusterFile string) (string, string, string, string, error) {
|
|||||||
caCrt = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.CACertName].Certificate))
|
caCrt = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.CACertName].Certificate))
|
||||||
clientCert = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.KubeAdminCommonName].Certificate))
|
clientCert = string(cert.EncodeCertPEM(kubeCluster.Certificates[pki.KubeAdminCommonName].Certificate))
|
||||||
clientKey = string(cert.EncodePrivateKeyPEM(kubeCluster.Certificates[pki.KubeAdminCommonName].Key))
|
clientKey = string(cert.EncodePrivateKeyPEM(kubeCluster.Certificates[pki.KubeAdminCommonName].Key))
|
||||||
|
|
||||||
|
logrus.Infof("Finished building Kubernetes cluster successfully")
|
||||||
return APIURL, caCrt, clientCert, clientKey, nil
|
return APIURL, caCrt, clientCert, clientKey, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,6 +40,35 @@ func DoRunContainer(dClient *client.Client, imageCfg *container.Config, hostCfg
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func DoRollingUpdateContainer(dClient *client.Client, imageCfg *container.Config, hostCfg *container.HostConfig, containerName, hostname, plane string) error {
|
||||||
|
logrus.Debugf("[%s] Checking for deployed %s", plane, containerName)
|
||||||
|
isRunning, err := IsContainerRunning(dClient, hostname, containerName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !isRunning {
|
||||||
|
logrus.Infof("[%s] Container %s is not running on host [%s]", plane, containerName, hostname)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
logrus.Debugf("[%s] Stopping old container", plane)
|
||||||
|
oldContainerName := "old-" + containerName
|
||||||
|
if err := StopRenameContainer(dClient, hostname, containerName, oldContainerName); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logrus.Infof("[%s] Successfully stopped old container %s on host [%s]", plane, containerName, hostname)
|
||||||
|
_, err = CreateContiner(dClient, hostname, containerName, imageCfg, hostCfg)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to create %s container on host [%s]: %v", containerName, hostname, err)
|
||||||
|
}
|
||||||
|
if err := StartContainer(dClient, hostname, containerName); err != nil {
|
||||||
|
return fmt.Errorf("Failed to start %s container on host [%s]: %v", containerName, hostname, err)
|
||||||
|
}
|
||||||
|
logrus.Infof("[%s] Successfully updated %s container on host [%s]", plane, containerName, hostname)
|
||||||
|
logrus.Debugf("[%s] Removing old container", plane)
|
||||||
|
err = RemoveContainer(dClient, hostname, oldContainerName)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func IsContainerRunning(dClient *client.Client, hostname string, containerName string) (bool, error) {
|
func IsContainerRunning(dClient *client.Client, hostname string, containerName string) (bool, error) {
|
||||||
logrus.Debugf("Checking if container %s is running on host [%s]", containerName, hostname)
|
logrus.Debugf("Checking if container %s is running on host [%s]", containerName, hostname)
|
||||||
containers, err := dClient.ContainerList(context.Background(), types.ContainerListOptions{})
|
containers, err := dClient.ContainerList(context.Background(), types.ContainerListOptions{})
|
||||||
|
@ -13,29 +13,60 @@ type Host struct {
|
|||||||
DClient *client.Client
|
DClient *client.Client
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReconcileWorkers(currentWorkers []Host, newWorkers []Host, kubeClient *kubernetes.Clientset) error {
|
func DeleteNode(toDeleteHost *Host, kubeClient *kubernetes.Clientset) error {
|
||||||
for _, currentWorker := range currentWorkers {
|
logrus.Infof("[hosts] Cordoning host [%s]", toDeleteHost.AdvertisedHostname)
|
||||||
|
err := k8s.CordonUncordon(kubeClient, toDeleteHost.AdvertisedHostname, true)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
logrus.Infof("[hosts] Deleting host [%s] from the cluster", toDeleteHost.AdvertisedHostname)
|
||||||
|
err = k8s.DeleteNode(kubeClient, toDeleteHost.AdvertisedHostname)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
logrus.Infof("[hosts] Successfully deleted host [%s] from the cluster", toDeleteHost.AdvertisedHostname)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetToDeleteHosts(currentHosts, configHosts []Host) []Host {
|
||||||
|
toDeleteHosts := []Host{}
|
||||||
|
for _, currentHost := range currentHosts {
|
||||||
found := false
|
found := false
|
||||||
for _, newWorker := range newWorkers {
|
for _, newHost := range configHosts {
|
||||||
if currentWorker.AdvertisedHostname == newWorker.AdvertisedHostname {
|
if currentHost.AdvertisedHostname == newHost.AdvertisedHostname {
|
||||||
found = true
|
found = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
if err := deleteWorkerNode(¤tWorker, kubeClient); err != nil {
|
toDeleteHosts = append(toDeleteHosts, currentHost)
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
return toDeleteHosts
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteWorkerNode(workerNode *Host, kubeClient *kubernetes.Clientset) error {
|
func IsHostListChanged(currentHosts, configHosts []Host) bool {
|
||||||
logrus.Infof("[hosts] Deleting host [%s] from the cluster", workerNode.AdvertisedHostname)
|
changed := false
|
||||||
err := k8s.DeleteNode(kubeClient, workerNode.AdvertisedHostname)
|
for _, host := range currentHosts {
|
||||||
if err != nil {
|
found := false
|
||||||
return err
|
for _, configHost := range configHosts {
|
||||||
|
if host.AdvertisedHostname == configHost.AdvertisedHostname {
|
||||||
|
found = true
|
||||||
}
|
}
|
||||||
logrus.Infof("[hosts] Successfully deleted host [%s] from the cluster", workerNode.AdvertisedHostname)
|
}
|
||||||
return nil
|
if !found {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, host := range configHosts {
|
||||||
|
found := false
|
||||||
|
for _, currentHost := range currentHosts {
|
||||||
|
if host.AdvertisedHostname == currentHost.AdvertisedHostname {
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return changed
|
||||||
}
|
}
|
||||||
|
2
main.go
2
main.go
@ -8,7 +8,7 @@ import (
|
|||||||
"github.com/urfave/cli"
|
"github.com/urfave/cli"
|
||||||
)
|
)
|
||||||
|
|
||||||
var VERSION = "v0.1.0-dev"
|
var VERSION = "v0.0.2-dev"
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
if err := mainErr(); err != nil {
|
if err := mainErr(); err != nil {
|
||||||
|
@ -24,7 +24,7 @@ users:
|
|||||||
client-key: ` + keyPath + ``
|
client-key: ` + keyPath + ``
|
||||||
}
|
}
|
||||||
|
|
||||||
func getKubeConfigX509WithData(kubernetesURL string, componentName string, cacrt string, crt string, key string) string {
|
func GetKubeConfigX509WithData(kubernetesURL string, componentName string, cacrt string, crt string, key string) string {
|
||||||
return `apiVersion: v1
|
return `apiVersion: v1
|
||||||
kind: Config
|
kind: Config
|
||||||
clusters:
|
clusters:
|
||||||
|
@ -166,7 +166,7 @@ func generateCerts(cpHosts []hosts.Host, clusterDomain, localConfigPath string,
|
|||||||
certs[KubeAdminCommonName] = CertificatePKI{
|
certs[KubeAdminCommonName] = CertificatePKI{
|
||||||
Certificate: kubeAdminCrt,
|
Certificate: kubeAdminCrt,
|
||||||
Key: kubeAdminKey,
|
Key: kubeAdminKey,
|
||||||
Config: getKubeConfigX509WithData(
|
Config: GetKubeConfigX509WithData(
|
||||||
"https://"+cpHosts[0].IP+":6443",
|
"https://"+cpHosts[0].IP+":6443",
|
||||||
KubeAdminCommonName,
|
KubeAdminCommonName,
|
||||||
string(cert.EncodeCertPEM(caCrt)),
|
string(cert.EncodeCertPEM(caCrt)),
|
||||||
|
@ -13,6 +13,15 @@ const (
|
|||||||
NginxProxyEnvName = "CP_HOSTS"
|
NginxProxyEnvName = "CP_HOSTS"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func RollingUpdateNginxProxy(cpHosts []hosts.Host, workerHosts []hosts.Host) error {
|
||||||
|
nginxProxyEnv := buildProxyEnv(cpHosts)
|
||||||
|
for _, host := range workerHosts {
|
||||||
|
imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv)
|
||||||
|
return docker.DoRollingUpdateContainer(host.DClient, imageCfg, hostCfg, NginxProxyContainerName, host.AdvertisedHostname, WorkerRole)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func runNginxProxy(host hosts.Host, cpHosts []hosts.Host) error {
|
func runNginxProxy(host hosts.Host, cpHosts []hosts.Host) error {
|
||||||
nginxProxyEnv := buildProxyEnv(cpHosts)
|
nginxProxyEnv := buildProxyEnv(cpHosts)
|
||||||
imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv)
|
imageCfg, hostCfg := buildNginxProxyConfig(host, nginxProxyEnv)
|
||||||
|
Loading…
Reference in New Issue
Block a user