mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-03 09:22:44 +00:00
Switch models. No master election.
This commit is contained in:
parent
35bd8d4a11
commit
a036ebc1be
@ -44,7 +44,7 @@ import (
|
|||||||
var (
|
var (
|
||||||
// Note: the weird ""+ in below lines seems to be the only way to get gofmt to
|
// Note: the weird ""+ in below lines seems to be the only way to get gofmt to
|
||||||
// arrange these text blocks sensibly. Grrr.
|
// arrange these text blocks sensibly. Grrr.
|
||||||
port = flag.Uint("port", 8080, ""+
|
port = flag.Int("port", 8080, ""+
|
||||||
"The port to listen on. Default 8080. It is assumed that firewall rules are "+
|
"The port to listen on. Default 8080. It is assumed that firewall rules are "+
|
||||||
"set up such that this port is not reachable from outside of the cluster. It is "+
|
"set up such that this port is not reachable from outside of the cluster. It is "+
|
||||||
"further assumed that port 443 on the cluster's public address is proxied to this "+
|
"further assumed that port 443 on the cluster's public address is proxied to this "+
|
||||||
@ -55,7 +55,7 @@ var (
|
|||||||
"and it is assumed that port 443 at this address will be proxied/redirected "+
|
"and it is assumed that port 443 at this address will be proxied/redirected "+
|
||||||
"to '-address':'-port'. If blank, the address in the first listed interface "+
|
"to '-address':'-port'. If blank, the address in the first listed interface "+
|
||||||
"will be used.")
|
"will be used.")
|
||||||
readOnlyPort = flag.Uint("read_only_port", 7080, ""+
|
readOnlyPort = flag.Int("read_only_port", 7080, ""+
|
||||||
"The port from which to serve read-only resources. If 0, don't serve on a "+
|
"The port from which to serve read-only resources. If 0, don't serve on a "+
|
||||||
"read-only address. It is assumed that firewall rules are set up such that "+
|
"read-only address. It is assumed that firewall rules are set up such that "+
|
||||||
"this port is not reachable from outside of the cluster.")
|
"this port is not reachable from outside of the cluster.")
|
||||||
@ -222,8 +222,8 @@ func main() {
|
|||||||
CorsAllowedOriginList: corsAllowedOriginList,
|
CorsAllowedOriginList: corsAllowedOriginList,
|
||||||
TokenAuthFile: *tokenAuthFile,
|
TokenAuthFile: *tokenAuthFile,
|
||||||
|
|
||||||
ReadOnlyPort: int(*readOnlyPort),
|
ReadOnlyPort: *readOnlyPort,
|
||||||
ReadWritePort: int(*port),
|
ReadWritePort: *port,
|
||||||
PublicAddress: *publicAddressOverride,
|
PublicAddress: *publicAddressOverride,
|
||||||
}
|
}
|
||||||
m := master.New(config)
|
m := master.New(config)
|
||||||
|
@ -21,7 +21,6 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
@ -36,7 +35,6 @@ import (
|
|||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
||||||
cloudcontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
|
cloudcontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/election"
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
|
||||||
@ -74,6 +72,10 @@ type Config struct {
|
|||||||
CorsAllowedOriginList util.StringList
|
CorsAllowedOriginList util.StringList
|
||||||
TokenAuthFile string
|
TokenAuthFile string
|
||||||
|
|
||||||
|
// Number of masters running; all masters must be started with the
|
||||||
|
// same value for this field. (Numbers > 1 currently untested.)
|
||||||
|
MasterCount int
|
||||||
|
|
||||||
// The port on PublicAddress where a read-only server will be installed.
|
// The port on PublicAddress where a read-only server will be installed.
|
||||||
// Defaults to 7080 if not set.
|
// Defaults to 7080 if not set.
|
||||||
ReadOnlyPort int
|
ReadOnlyPort int
|
||||||
@ -104,18 +106,14 @@ type Master struct {
|
|||||||
apiPrefix string
|
apiPrefix string
|
||||||
corsAllowedOriginList util.StringList
|
corsAllowedOriginList util.StringList
|
||||||
tokenAuthFile string
|
tokenAuthFile string
|
||||||
|
masterCount int
|
||||||
|
|
||||||
// "Outputs"
|
// "Outputs"
|
||||||
Handler http.Handler
|
Handler http.Handler
|
||||||
|
|
||||||
elector election.MasterElector
|
readOnlyServer string
|
||||||
readOnlyServer string
|
readWriteServer string
|
||||||
readWriteServer string
|
masterServices *util.Runner
|
||||||
electedMasterServices *util.Runner
|
|
||||||
|
|
||||||
// lock must be held when accessing the below read-write members.
|
|
||||||
lock sync.RWMutex
|
|
||||||
electedMaster election.Master
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version
|
// NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version
|
||||||
@ -133,6 +131,10 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe
|
|||||||
|
|
||||||
// setDefaults fills in any fields not set that are required to have valid data.
|
// setDefaults fills in any fields not set that are required to have valid data.
|
||||||
func setDefaults(c *Config) {
|
func setDefaults(c *Config) {
|
||||||
|
if c.MasterCount == 0 {
|
||||||
|
// Clearly, there will be at least one master.
|
||||||
|
c.MasterCount = 1
|
||||||
|
}
|
||||||
if c.ReadOnlyPort == 0 {
|
if c.ReadOnlyPort == 0 {
|
||||||
c.ReadOnlyPort = 7080
|
c.ReadOnlyPort = 7080
|
||||||
}
|
}
|
||||||
@ -140,6 +142,9 @@ func setDefaults(c *Config) {
|
|||||||
c.ReadWritePort = 443
|
c.ReadWritePort = 443
|
||||||
}
|
}
|
||||||
if c.PublicAddress == "" {
|
if c.PublicAddress == "" {
|
||||||
|
// Find and use the first non-loopback address.
|
||||||
|
// TODO: potentially it'd be useful to skip the docker interface if it
|
||||||
|
// somehow is first in the list.
|
||||||
addrs, err := net.InterfaceAddrs()
|
addrs, err := net.InterfaceAddrs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Fatalf("Unable to get network interfaces: error='%v'", err)
|
glog.Fatalf("Unable to get network interfaces: error='%v'", err)
|
||||||
@ -190,11 +195,11 @@ func New(c *Config) *Master {
|
|||||||
apiPrefix: c.APIPrefix,
|
apiPrefix: c.APIPrefix,
|
||||||
corsAllowedOriginList: c.CorsAllowedOriginList,
|
corsAllowedOriginList: c.CorsAllowedOriginList,
|
||||||
tokenAuthFile: c.TokenAuthFile,
|
tokenAuthFile: c.TokenAuthFile,
|
||||||
elector: election.NewEtcdMasterElector(c.EtcdHelper.Client),
|
masterCount: c.MasterCount,
|
||||||
readOnlyServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadOnlyPort))),
|
readOnlyServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadOnlyPort))),
|
||||||
readWriteServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadWritePort))),
|
readWriteServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadWritePort))),
|
||||||
}
|
}
|
||||||
m.electedMasterServices = util.NewRunner(m.serviceWriterLoop, m.electionAnnounce)
|
m.masterServices = util.NewRunner(m.serviceWriterLoop, m.roServiceWriterLoop)
|
||||||
m.init(c)
|
m.init(c)
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
@ -281,18 +286,8 @@ func (m *Master) init(c *Config) {
|
|||||||
|
|
||||||
m.Handler = handler
|
m.Handler = handler
|
||||||
|
|
||||||
if m.readWriteServer != "" {
|
// TODO: Attempt clean shutdown?
|
||||||
glog.Infof("Starting election services as %v", m.readWriteServer)
|
m.masterServices.Start()
|
||||||
go election.Notify(m.elector, "/registry/elections/k8smaster", m.readWriteServer, m.electedMasterServices)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: start a goroutine to report ourselves to the elected master.
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *Master) electionAnnounce(stop chan struct{}) {
|
|
||||||
glog.Infof("Elected as master")
|
|
||||||
<-stop
|
|
||||||
glog.Info("Lost election for master")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// API_v1beta1 returns the resources and codec for API version v1beta1.
|
// API_v1beta1 returns the resources and codec for API version v1beta1.
|
||||||
|
@ -35,21 +35,31 @@ func (m *Master) serviceWriterLoop(stop chan struct{}) {
|
|||||||
if err := m.createMasterServiceIfNeeded("kubernetes", 443); err != nil {
|
if err := m.createMasterServiceIfNeeded("kubernetes", 443); err != nil {
|
||||||
glog.Errorf("Can't create rw service: %v", err)
|
glog.Errorf("Can't create rw service: %v", err)
|
||||||
}
|
}
|
||||||
if err := m.setEndpoints("kubernetes", []string{m.readWriteServer}); err != nil {
|
if err := m.ensureEndpointsContain("kubernetes", m.readWriteServer); err != nil {
|
||||||
glog.Errorf("Can't create rw endpoints: %v", err)
|
glog.Errorf("Can't create rw endpoints: %v", err)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
m.deleteMasterService("kubernetes")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-stop:
|
||||||
|
return
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Master) roServiceWriterLoop(stop chan struct{}) {
|
||||||
|
for {
|
||||||
|
// Update service & endpoint records.
|
||||||
|
// TODO: when it becomes possible to change this stuff,
|
||||||
|
// stop polling and start watching.
|
||||||
if m.readOnlyServer != "" {
|
if m.readOnlyServer != "" {
|
||||||
if err := m.createMasterServiceIfNeeded("kubernetes-ro", 80); err != nil {
|
if err := m.createMasterServiceIfNeeded("kubernetes-ro", 80); err != nil {
|
||||||
glog.Errorf("Can't create ro service: %v", err)
|
glog.Errorf("Can't create ro service: %v", err)
|
||||||
}
|
}
|
||||||
if err := m.setEndpoints("kubernetes-ro", []string{m.readOnlyServer}); err != nil {
|
if err := m.ensureEndpointsContain("kubernetes-ro", m.readOnlyServer); err != nil {
|
||||||
glog.Errorf("Can't create rw endpoints: %v", err)
|
glog.Errorf("Can't create ro endpoints: %v", err)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
m.deleteMasterService("kubernetes-ro")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -86,20 +96,18 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, port int) error
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
resp := <-c
|
resp := <-c
|
||||||
if _, ok := resp.(*api.Service); ok {
|
if _, ok := resp.Object.(*api.Service); ok {
|
||||||
// If all worked, we get back an *api.Service object.
|
// If all worked, we get back an *api.Service object.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("Unexpected response: %#v", resp)
|
return fmt.Errorf("Unexpected response: %#v", resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Master) deleteMasterService(serviceName string) {
|
// ensureEndpointsContain sets the endpoints for the given service. Also removes
|
||||||
ctx := api.NewDefaultContext()
|
// excess endpoints (as determined by m.masterCount). Extra endpoints could appear
|
||||||
m.serviceRegistry.DeleteService(ctx, serviceName)
|
// in the list if, for example, the master starts running on a different machine,
|
||||||
}
|
// changing IP addresses.
|
||||||
|
func (m *Master) ensureEndpointsContain(serviceName string, endpoint string) error {
|
||||||
// setEndpoints sets the endpoints for the given service.
|
|
||||||
func (m *Master) setEndpoints(serviceName string, endpoints []string) error {
|
|
||||||
ctx := api.NewDefaultContext()
|
ctx := api.NewDefaultContext()
|
||||||
e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName)
|
e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -108,6 +116,23 @@ func (m *Master) setEndpoints(serviceName string, endpoints []string) error {
|
|||||||
e.ObjectMeta.Name = serviceName
|
e.ObjectMeta.Name = serviceName
|
||||||
e.ObjectMeta.Namespace = "default"
|
e.ObjectMeta.Namespace = "default"
|
||||||
}
|
}
|
||||||
e.Endpoints = endpoints
|
found := false
|
||||||
|
for i := range e.Endpoints {
|
||||||
|
if e.Endpoints[i] == endpoint {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !found {
|
||||||
|
e.Endpoints = append(e.Endpoints, endpoint)
|
||||||
|
}
|
||||||
|
if len(e.Endpoints) > m.masterCount {
|
||||||
|
// We append to the end and remove from the beginning, so this should
|
||||||
|
// converge rapidly with all masters performing this operation.
|
||||||
|
e.Endpoints = e.Endpoints[len(e.Endpoints)-m.masterCount:]
|
||||||
|
} else if found {
|
||||||
|
// We didn't make any changes, no need to actually call update.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return m.endpointRegistry.UpdateEndpoints(ctx, e)
|
return m.endpointRegistry.UpdateEndpoints(ctx, e)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user