Make PortalIP alloc HA

* Add an allocator which saves state in etcd
* Perform PortalIP allocation check on startup and periodically afterwards

Also expose methods in master for downstream components to handle IP allocation
/ master registration themselves.
This commit is contained in:
Clayton Coleman
2015-05-03 18:44:05 -04:00
parent 0d16f43475
commit e200d5a317
31 changed files with 1475 additions and 811 deletions

View File

@@ -63,6 +63,8 @@ import (
resourcequotaetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/resourcequota/etcd"
secretetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/secret/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
ipallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator"
etcdipallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/ui"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@@ -78,13 +80,15 @@ const (
// Config is a structure used to configure a Master.
type Config struct {
EtcdHelper tools.EtcdHelper
EventTTL time.Duration
MinionRegexp string
KubeletClient client.KubeletClient
PortalNet *net.IPNet
EnableLogsSupport bool
EnableUISupport bool
EtcdHelper tools.EtcdHelper
EventTTL time.Duration
MinionRegexp string
KubeletClient client.KubeletClient
PortalNet *net.IPNet
// allow downstream consumers to disable the core controller loops
EnableCoreControllers bool
EnableLogsSupport bool
EnableUISupport bool
// allow downstream consumers to disable swagger
EnableSwaggerSupport bool
// allow v1beta3 to be conditionally disabled
@@ -144,6 +148,7 @@ type Master struct {
muxHelper *apiserver.MuxHelper
handlerContainer *restful.Container
rootWebService *restful.WebService
enableCoreControllers bool
enableLogsSupport bool
enableUISupport bool
enableSwaggerSupport bool
@@ -180,6 +185,7 @@ type Master struct {
namespaceRegistry namespace.Registry
serviceRegistry service.Registry
endpointRegistry endpoint.Registry
portalAllocator service.IPRegistry
// "Outputs"
Handler http.Handler
@@ -208,6 +214,9 @@ func setDefaults(c *Config) {
if err != nil {
glog.Fatalf("Unable to parse CIDR: %v", err)
}
if size := ipallocator.RangeSize(portalNet); size < 8 {
glog.Fatalf("The portal net range must be at least %d IP addresses", 8)
}
c.PortalNet = portalNet
}
if c.MasterCount == 0 {
@@ -271,11 +280,11 @@ func New(c *Config) *Master {
}
// Select the first two valid IPs from portalNet to use as the master service portalIPs
serviceReadOnlyIP, err := service.GetIndexedIP(c.PortalNet, 1)
serviceReadOnlyIP, err := ipallocator.GetIndexedIP(c.PortalNet, 1)
if err != nil {
glog.Fatalf("Failed to generate service read-only IP for master service: %v", err)
}
serviceReadWriteIP, err := service.GetIndexedIP(c.PortalNet, 2)
serviceReadWriteIP, err := ipallocator.GetIndexedIP(c.PortalNet, 2)
if err != nil {
glog.Fatalf("Failed to generate service read-write IP for master service: %v", err)
}
@@ -284,6 +293,7 @@ func New(c *Config) *Master {
m := &Master{
portalNet: c.PortalNet,
rootWebService: new(restful.WebService),
enableCoreControllers: c.EnableCoreControllers,
enableLogsSupport: c.EnableLogsSupport,
enableUISupport: c.EnableUISupport,
enableSwaggerSupport: c.EnableSwaggerSupport,
@@ -324,7 +334,6 @@ func New(c *Config) *Master {
m.handlerContainer.Router(restful.CurlyRouter{})
m.muxHelper = &apiserver.MuxHelper{m.mux, []string{}}
m.masterServices = util.NewRunner(m.serviceWriterLoop, m.roServiceWriterLoop)
m.init(c)
return m
}
@@ -405,6 +414,10 @@ func (m *Master) init(c *Config) {
registry := etcd.NewRegistry(c.EtcdHelper, podRegistry, m.endpointRegistry)
m.serviceRegistry = registry
ipAllocator := ipallocator.NewCIDRRange(m.portalNet)
portalAllocator := etcdipallocator.NewEtcd(ipAllocator, c.EtcdHelper)
m.portalAllocator = portalAllocator
controllerStorage := controlleretcd.NewREST(c.EtcdHelper)
// TODO: Factor out the core API registration
@@ -421,7 +434,7 @@ func (m *Master) init(c *Config) {
"podTemplates": podTemplateStorage,
"replicationControllers": controllerStorage,
"services": service.NewStorage(m.serviceRegistry, m.nodeRegistry, m.endpointRegistry, m.portalNet, c.ClusterName),
"services": service.NewStorage(m.serviceRegistry, m.nodeRegistry, m.endpointRegistry, portalAllocator, c.ClusterName),
"endpoints": endpointsStorage,
"minions": nodeStorage,
"minions/status": nodeStatusStorage,
@@ -544,7 +557,9 @@ func (m *Master) init(c *Config) {
}
// TODO: Attempt clean shutdown?
m.masterServices.Start()
if m.enableCoreControllers {
m.StartCoreControllers()
}
}
// InstallSwaggerAPI installs the /swaggerapi/ endpoint to allow schema discovery

View File

@@ -24,11 +24,28 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/endpoints"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
servicecontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
func (m *Master) serviceWriterLoop(stop chan struct{}) {
// StartCoreControllers begins the core controller loops that must exist for bootstrapping
// a cluster.
func (m *Master) StartCoreControllers() {
if m.masterServices != nil {
return
}
repair := servicecontroller.NewRepair(3*time.Minute, m.serviceRegistry, m.portalNet, m.portalAllocator)
if err := repair.RunOnce(); err != nil {
glog.Errorf("Unable to perform initial IP allocation check: %v", err)
}
m.masterServices = util.NewRunner(m.ServiceWriterLoop, m.ROServiceWriterLoop, repair.RunUntil)
m.masterServices.Start()
}
// ServiceWriterLoop is exposed for downstream consumers of master
func (m *Master) ServiceWriterLoop(stop chan struct{}) {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
@@ -36,14 +53,14 @@ func (m *Master) serviceWriterLoop(stop chan struct{}) {
// TODO: when it becomes possible to change this stuff,
// stop polling and start watching.
// TODO: add endpoints of all replicas, not just the elected master.
if err := m.createMasterNamespaceIfNeeded(api.NamespaceDefault); err != nil {
if err := m.CreateMasterNamespaceIfNeeded(api.NamespaceDefault); err != nil {
glog.Errorf("Can't create master namespace: %v", err)
}
if m.serviceReadWriteIP != nil {
if err := m.createMasterServiceIfNeeded("kubernetes", m.serviceReadWriteIP, m.serviceReadWritePort); err != nil && !errors.IsAlreadyExists(err) {
if err := m.CreateMasterServiceIfNeeded("kubernetes", m.serviceReadWriteIP, m.serviceReadWritePort); err != nil && !errors.IsAlreadyExists(err) {
glog.Errorf("Can't create rw service: %v", err)
}
if err := m.setEndpoints("kubernetes", m.clusterIP, m.publicReadWritePort); err != nil {
if err := m.SetEndpoints("kubernetes", m.clusterIP, m.publicReadWritePort); err != nil {
glog.Errorf("Can't create rw endpoints: %v", err)
}
}
@@ -56,21 +73,22 @@ func (m *Master) serviceWriterLoop(stop chan struct{}) {
}
}
func (m *Master) roServiceWriterLoop(stop chan struct{}) {
// ROServiceWriterLoop is exposed for downstream consumers of master
func (m *Master) ROServiceWriterLoop(stop chan struct{}) {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
// Update service & endpoint records.
// TODO: when it becomes possible to change this stuff,
// stop polling and start watching.
if err := m.createMasterNamespaceIfNeeded(api.NamespaceDefault); err != nil {
if err := m.CreateMasterNamespaceIfNeeded(api.NamespaceDefault); err != nil {
glog.Errorf("Can't create master namespace: %v", err)
}
if m.serviceReadOnlyIP != nil {
if err := m.createMasterServiceIfNeeded("kubernetes-ro", m.serviceReadOnlyIP, m.serviceReadOnlyPort); err != nil && !errors.IsAlreadyExists(err) {
if err := m.CreateMasterServiceIfNeeded("kubernetes-ro", m.serviceReadOnlyIP, m.serviceReadOnlyPort); err != nil && !errors.IsAlreadyExists(err) {
glog.Errorf("Can't create ro service: %v", err)
}
if err := m.setEndpoints("kubernetes-ro", m.clusterIP, m.publicReadOnlyPort); err != nil {
if err := m.SetEndpoints("kubernetes-ro", m.clusterIP, m.publicReadOnlyPort); err != nil {
glog.Errorf("Can't create ro endpoints: %v", err)
}
}
@@ -83,8 +101,8 @@ func (m *Master) roServiceWriterLoop(stop chan struct{}) {
}
}
// createMasterNamespaceIfNeeded will create the namespace that contains the master services if it doesn't already exist
func (m *Master) createMasterNamespaceIfNeeded(ns string) error {
// CreateMasterNamespaceIfNeeded will create the namespace that contains the master services if it doesn't already exist
func (m *Master) CreateMasterNamespaceIfNeeded(ns string) error {
ctx := api.NewContext()
if _, err := m.namespaceRegistry.GetNamespace(ctx, api.NamespaceDefault); err == nil {
// the namespace already exists
@@ -103,9 +121,9 @@ func (m *Master) createMasterNamespaceIfNeeded(ns string) error {
return err
}
// createMasterServiceIfNeeded will create the specified service if it
// CreateMasterServiceIfNeeded will create the specified service if it
// doesn't already exist.
func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePort int) error {
func (m *Master) CreateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePort int) error {
ctx := api.NewDefaultContext()
if _, err := m.serviceRegistry.GetService(ctx, serviceName); err == nil {
// The service already exists.
@@ -132,20 +150,20 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I
return err
}
// setEndpoints sets the endpoints for the given apiserver service (ro or rw).
// setEndpoints expects that the endpoints objects it manages will all be
// managed only by setEndpoints; therefore, to understand this, you need only
// SetEndpoints sets the endpoints for the given apiserver service (ro or rw).
// SetEndpoints expects that the endpoints objects it manages will all be
// managed only by SetEndpoints; therefore, to understand this, you need only
// understand the requirements and the body of this function.
//
// Requirements:
// * All apiservers MUST use the same ports for their {rw, ro} services.
// * All apiservers MUST use setEndpoints and only setEndpoints to manage the
// * All apiservers MUST use SetEndpoints and only SetEndpoints to manage the
// endpoints for their {rw, ro} services.
// * All apiservers MUST know and agree on the number of apiservers expected
// to be running (m.masterCount).
// * setEndpoints is called periodically from all apiservers.
// * SetEndpoints is called periodically from all apiservers.
//
func (m *Master) setEndpoints(serviceName string, ip net.IP, port int) error {
func (m *Master) SetEndpoints(serviceName string, ip net.IP, port int) error {
ctx := api.NewDefaultContext()
e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName)
if err != nil {