Add HA p2p support

Fixes: https://github.com/kairos-io/kairos/issues/2

Signed-off-by: Ettore Di Giacinto <mudler@mocaccino.org>
This commit is contained in:
Ettore Di Giacinto
2022-12-06 17:27:29 +01:00
parent f1a76f59d1
commit db3a4bc287
5 changed files with 108 additions and 22 deletions

View File

@@ -97,18 +97,16 @@ func Bootstrap(e *pluggable.Event) pluggable.EventResponse {
return ErrorEvent("No network token provided, exiting") return ErrorEvent("No network token provided, exiting")
} }
if !providerConfig.Kairos.Hybrid { if !providerConfig.Kairos.Hybrid || providerConfig.Kairos.HybridVPN {
logger.Info("Configuring VPN") logger.Info("Configuring VPN")
if err := SetupVPN(services.EdgeVPNDefaultInstance, cfg.APIAddress, "/", true, providerConfig); err != nil { if err := SetupVPN(services.EdgeVPNDefaultInstance, cfg.APIAddress, "/", true, providerConfig); err != nil {
return ErrorEvent("Failed setup VPN: %s", err.Error()) return ErrorEvent("Failed setup VPN: %s", err.Error())
} }
} else { } else {
logger.Info("Configuring API") logger.Info("Configuring API")
if err := SetupAPI(cfg.APIAddress, "/", true, providerConfig); err != nil { if err := SetupAPI(cfg.APIAddress, "/", true, providerConfig); err != nil {
return ErrorEvent("Failed setup VPN: %s", err.Error()) return ErrorEvent("Failed setup VPN: %s", err.Error())
} }
} }
networkID := "kairos" networkID := "kairos"
@@ -131,7 +129,15 @@ func Bootstrap(e *pluggable.Event) pluggable.EventResponse {
service.WithRoles( service.WithRoles(
service.RoleKey{ service.RoleKey{
Role: "master", Role: "master",
RoleHandler: p2p.Master(c, providerConfig), RoleHandler: p2p.Master(c, providerConfig, false, false),
},
service.RoleKey{
Role: "master/clusterinit",
RoleHandler: p2p.Master(c, providerConfig, true, true),
},
service.RoleKey{
Role: "master/ha",
RoleHandler: p2p.Master(c, providerConfig, false, true),
}, },
service.RoleKey{ service.RoleKey{
Role: "worker", Role: "worker",

View File

@@ -7,6 +7,7 @@ type Kairos struct {
DNS bool `yaml:"dns,omitempty"` DNS bool `yaml:"dns,omitempty"`
LogLevel string `yaml:"loglevel,omitempty"` LogLevel string `yaml:"loglevel,omitempty"`
Hybrid bool `yaml:"hybrid,omitempty"` Hybrid bool `yaml:"hybrid,omitempty"`
HybridVPN bool `yaml:"hybrid_vpn,omitempty"`
MinimumNodes int `yaml:"minimum_nodes,omitempty"` MinimumNodes int `yaml:"minimum_nodes,omitempty"`
} }
@@ -25,10 +26,18 @@ type KubeVIP struct {
Interface string `yaml:"interface,omitempty"` Interface string `yaml:"interface,omitempty"`
} }
type HA struct {
Enable bool `yaml:"enable,omitempty"`
EmbeddedDB bool `yaml:"embedded,omitempty"`
ExternalDB string `yaml:"external_db,omitempty"`
MasterNodes int `yaml:"master_nodes,omitempty"`
}
type K3s struct { type K3s struct {
Env map[string]string `yaml:"env,omitempty"` Env map[string]string `yaml:"env,omitempty"`
ReplaceEnv bool `yaml:"replace_env,omitempty"` ReplaceEnv bool `yaml:"replace_env,omitempty"`
ReplaceArgs bool `yaml:"replace_args,omitempty"` ReplaceArgs bool `yaml:"replace_args,omitempty"`
Args []string `yaml:"args,omitempty"` Args []string `yaml:"args,omitempty"`
Enabled bool `yaml:"enabled,omitempty"` Enabled bool `yaml:"enabled,omitempty"`
HA HA `yaml:"ha,omitempty"`
} }

View File

@@ -18,7 +18,7 @@ import (
service "github.com/mudler/edgevpn/api/client/service" service "github.com/mudler/edgevpn/api/client/service"
) )
func propagateMasterData(ip string, c *service.RoleConfig) error { func propagateMasterData(ip string, c *service.RoleConfig, clusterInit, ha bool) error {
defer func() { defer func() {
// Avoid polluting the API. // Avoid polluting the API.
// The ledger already retries in the background to update the blockchain, but it has // The ledger already retries in the background to update the blockchain, but it has
@@ -29,9 +29,29 @@ func propagateMasterData(ip string, c *service.RoleConfig) error {
}() }()
// If we are configured as master, always signal our role // If we are configured as master, always signal our role
if err := c.Client.Set("role", c.UUID, "master"); err != nil { // if err := c.Client.Set("role", c.UUID, "master"); err != nil {
c.Logger.Error(err) // c.Logger.Error(err)
return err // return err
// }
// if ha && clusterInit {
// tokenB, err := ioutil.ReadFile("/var/lib/rancher/k3s/server/generated-token")
// if err != nil {
// c.Logger.Error(err)
// return err
// }
// nodeToken := string(tokenB)
// nodeToken = strings.TrimRight(nodeToken, "\n")
// if nodeToken != "" {
// err := c.Client.Set("generatedtoken", "token", nodeToken)
// if err != nil {
// c.Logger.Error(err)
// }
// }
// }
if ha && !clusterInit {
return nil
} }
tokenB, err := ioutil.ReadFile("/var/lib/rancher/k3s/server/node-token") tokenB, err := ioutil.ReadFile("/var/lib/rancher/k3s/server/node-token")
@@ -68,7 +88,7 @@ func propagateMasterData(ip string, c *service.RoleConfig) error {
return nil return nil
} }
func Master(cc *config.Config, pconfig *providerConfig.Config) role.Role { func Master(cc *config.Config, pconfig *providerConfig.Config, clusterInit, ha bool) role.Role {
return func(c *service.RoleConfig) error { return func(c *service.RoleConfig) error {
var ip string var ip string
@@ -93,7 +113,21 @@ func Master(cc *config.Config, pconfig *providerConfig.Config) role.Role {
if role.SentinelExist() { if role.SentinelExist() {
c.Logger.Info("Node already configured, backing off") c.Logger.Info("Node already configured, backing off")
return propagateMasterData(ip, c) return propagateMasterData(ip, c, clusterInit, ha)
}
var nodeToken, clusterInitIP string
if ha && !clusterInit {
nodeToken, _ = c.Client.Get("nodetoken", "token")
if nodeToken == "" {
c.Logger.Info("nodetoken not there still..")
return nil
}
clusterInitIP, _ = c.Client.Get("master", "ip")
if clusterInitIP == "" {
c.Logger.Info("clusterInitIP not there still..")
return nil
}
} }
// Configure k3s service to start on edgevpn0 // Configure k3s service to start on edgevpn0
@@ -110,6 +144,15 @@ func Master(cc *config.Config, pconfig *providerConfig.Config) role.Role {
} }
env := map[string]string{} env := map[string]string{}
// if clusterInit && ha {
// token := utils.RandStringRunes(36)
// env["K3S_TOKEN"] = token
// os.WriteFile("/var/lib/rancher/k3s/server/generated-token", []byte(token), 0600)
// }
if ha && !clusterInit {
env["K3S_TOKEN"] = nodeToken
}
if !k3sConfig.ReplaceEnv { if !k3sConfig.ReplaceEnv {
// Override opts with user-supplied // Override opts with user-supplied
for k, v := range k3sConfig.Env { for k, v := range k3sConfig.Env {
@@ -131,9 +174,15 @@ func Master(cc *config.Config, pconfig *providerConfig.Config) role.Role {
if err := deployKubeVIP(iface, ip, pconfig); err != nil { if err := deployKubeVIP(iface, ip, pconfig); err != nil {
return fmt.Errorf("failed KubeVIP setup: %w", err) return fmt.Errorf("failed KubeVIP setup: %w", err)
} }
if pconfig.Kairos.HybridVPN {
args = append(args, "--flannel-iface=edgevpn0")
}
} else { } else {
args = []string{"--flannel-iface=edgevpn0"} args = []string{"--flannel-iface=edgevpn0"}
} }
if ha && !clusterInit {
args = append(args, fmt.Sprintf("--server=https://%s:6443", clusterInitIP))
}
if k3sConfig.ReplaceArgs { if k3sConfig.ReplaceArgs {
args = k3sConfig.Args args = k3sConfig.Args
@@ -141,6 +190,10 @@ func Master(cc *config.Config, pconfig *providerConfig.Config) role.Role {
args = append(args, k3sConfig.Args...) args = append(args, k3sConfig.Args...)
} }
if clusterInit && ha {
args = append(args, "--cluster-init")
}
k3sbin := utils.K3sBin() k3sbin := utils.K3sBin()
if k3sbin == "" { if k3sbin == "" {
return fmt.Errorf("no k3s binary found (?)") return fmt.Errorf("no k3s binary found (?)")
@@ -158,15 +211,10 @@ func Master(cc *config.Config, pconfig *providerConfig.Config) role.Role {
return err return err
} }
if err := propagateMasterData(ip, c); err != nil { if err := propagateMasterData(ip, c, clusterInit, ha); err != nil {
return err return err
} }
return role.CreateSentinel() return role.CreateSentinel()
} }
} }
// TODO: https://rancher.com/docs/k3s/latest/en/installation/ha-embedded/
func HAMaster(c *service.RoleConfig) {
c.Logger.Info("HA Role not implemented yet")
}

View File

@@ -37,8 +37,8 @@ func Worker(cc *config.Config, pconfig *providerConfig.Config) role.Role {
} }
nodeToken, _ := c.Client.Get("nodetoken", "token") nodeToken, _ := c.Client.Get("nodetoken", "token")
if masterIP == "" { if nodeToken == "" {
c.Logger.Info("nodetoken not there still..") c.Logger.Info("node token not there still..")
return nil return nil
} }

View File

@@ -1,6 +1,7 @@
package role package role
import ( import (
"fmt"
"math/rand" "math/rand"
"time" "time"
@@ -11,10 +12,14 @@ import (
) )
// scheduleRoles assigns roles to nodes. Meant to be called only by leaders // scheduleRoles assigns roles to nodes. Meant to be called only by leaders
// TODO: HA-Auto. // TODO: External DB
func scheduleRoles(nodes []string, c *service.RoleConfig, cc *config.Config, pconfig *providerConfig.Config) error { func scheduleRoles(nodes []string, c *service.RoleConfig, cc *config.Config, pconfig *providerConfig.Config) error {
rand.Seed(time.Now().Unix()) rand.Seed(time.Now().Unix())
if pconfig.Kairos.Hybrid {
c.Logger.Info("hybrid p2p with KubeVIP enabled")
}
// Assign roles to nodes // Assign roles to nodes
unassignedNodes, currentRoles := getRoles(c.Client, nodes) unassignedNodes, currentRoles := getRoles(c.Client, nodes)
c.Logger.Infof("I'm the leader. My UUID is: %s.\n Current assigned roles: %+v", c.UUID, currentRoles) c.Logger.Infof("I'm the leader. My UUID is: %s.\n Current assigned roles: %+v", c.UUID, currentRoles)
@@ -23,16 +28,22 @@ func scheduleRoles(nodes []string, c *service.RoleConfig, cc *config.Config, pco
masterRole := "master" masterRole := "master"
workerRole := "worker" workerRole := "worker"
masterHA := "master/ha"
if pconfig.Kairos.Hybrid { if pconfig.K3s.HA.Enable {
c.Logger.Info("hybrid p2p with KubeVIP enabled") masterRole = "master/clusterinit"
} }
mastersHA := 0
for _, r := range currentRoles { for _, r := range currentRoles {
if r == masterRole { if r == masterRole {
existsMaster = true existsMaster = true
} }
if r == masterHA {
mastersHA++
}
} }
c.Logger.Infof("Master already present: %t", existsMaster) c.Logger.Infof("Master already present: %t", existsMaster)
c.Logger.Infof("Unassigned nodes: %+v", unassignedNodes) c.Logger.Infof("Unassigned nodes: %+v", unassignedNodes)
@@ -60,13 +71,25 @@ func scheduleRoles(nodes []string, c *service.RoleConfig, cc *config.Config, pco
if err := c.Client.Set("role", selected, masterRole); err != nil { if err := c.Client.Set("role", selected, masterRole); err != nil {
return err return err
} }
c.Logger.Info("-> Set master to", selected) c.Logger.Infof("-> Set %s to %s", masterRole, selected)
currentRoles[selected] = masterRole currentRoles[selected] = masterRole
// Return here, so next time we get called // Return here, so next time we get called
// makes sure master is set. // makes sure master is set.
return nil return nil
} }
if pconfig.K3s.HA.Enable && pconfig.K3s.HA.MasterNodes != mastersHA {
if len(unassignedNodes) > 0 {
if err := c.Client.Set("role", unassignedNodes[0], masterHA); err != nil {
c.Logger.Error(err)
return err
}
return nil
} else {
return fmt.Errorf("not enough nodes to create ha control plane")
}
}
// cycle all empty roles and assign worker roles // cycle all empty roles and assign worker roles
for _, uuid := range unassignedNodes { for _, uuid := range unassignedNodes {
if err := c.Client.Set("role", uuid, workerRole); err != nil { if err := c.Client.Set("role", uuid, workerRole); err != nil {