From a036ebc1be5619b395c67d2088bffb98a5bfc8d8 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Tue, 28 Oct 2014 16:49:52 -0700 Subject: [PATCH] Switch models. No master election. --- cmd/apiserver/apiserver.go | 8 +++--- pkg/master/master.go | 43 +++++++++++++--------------- pkg/master/publish.go | 57 +++++++++++++++++++++++++++----------- 3 files changed, 64 insertions(+), 44 deletions(-) diff --git a/cmd/apiserver/apiserver.go b/cmd/apiserver/apiserver.go index 7961316c68a..ac06f668370 100644 --- a/cmd/apiserver/apiserver.go +++ b/cmd/apiserver/apiserver.go @@ -44,7 +44,7 @@ import ( var ( // Note: the weird ""+ in below lines seems to be the only way to get gofmt to // arrange these text blocks sensibly. Grrr. - port = flag.Uint("port", 8080, ""+ + port = flag.Int("port", 8080, ""+ "The port to listen on. Default 8080. It is assumed that firewall rules are "+ "set up such that this port is not reachable from outside of the cluster. It is "+ "further assumed that port 443 on the cluster's public address is proxied to this "+ @@ -55,7 +55,7 @@ var ( "and it is assumed that port 443 at this address will be proxied/redirected "+ "to '-address':'-port'. If blank, the address in the first listed interface "+ "will be used.") - readOnlyPort = flag.Uint("read_only_port", 7080, ""+ + readOnlyPort = flag.Int("read_only_port", 7080, ""+ "The port from which to serve read-only resources. If 0, don't serve on a "+ "read-only address. It is assumed that firewall rules are set up such that "+ "this port is not reachable from outside of the cluster.") @@ -222,8 +222,8 @@ func main() { CorsAllowedOriginList: corsAllowedOriginList, TokenAuthFile: *tokenAuthFile, - ReadOnlyPort: int(*readOnlyPort), - ReadWritePort: int(*port), + ReadOnlyPort: *readOnlyPort, + ReadWritePort: *port, PublicAddress: *publicAddressOverride, } m := master.New(config) diff --git a/pkg/master/master.go b/pkg/master/master.go index 09b9420ebb1..a014d41eaf6 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -21,7 +21,6 @@ import ( "net/http" "strconv" "strings" - "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -36,7 +35,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" cloudcontroller "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller" - "github.com/GoogleCloudPlatform/kubernetes/pkg/election" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" @@ -74,6 +72,10 @@ type Config struct { CorsAllowedOriginList util.StringList TokenAuthFile string + // Number of masters running; all masters must be started with the + // same value for this field. (Numbers > 1 currently untested.) + MasterCount int + // The port on PublicAddress where a read-only server will be installed. // Defaults to 7080 if not set. ReadOnlyPort int @@ -104,18 +106,14 @@ type Master struct { apiPrefix string corsAllowedOriginList util.StringList tokenAuthFile string + masterCount int // "Outputs" Handler http.Handler - elector election.MasterElector - readOnlyServer string - readWriteServer string - electedMasterServices *util.Runner - - // lock must be held when accessing the below read-write members. - lock sync.RWMutex - electedMaster election.Master + readOnlyServer string + readWriteServer string + masterServices *util.Runner } // NewEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version @@ -133,6 +131,10 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe // setDefaults fills in any fields not set that are required to have valid data. func setDefaults(c *Config) { + if c.MasterCount == 0 { + // Clearly, there will be at least one master. + c.MasterCount = 1 + } if c.ReadOnlyPort == 0 { c.ReadOnlyPort = 7080 } @@ -140,6 +142,9 @@ func setDefaults(c *Config) { c.ReadWritePort = 443 } if c.PublicAddress == "" { + // Find and use the first non-loopback address. + // TODO: potentially it'd be useful to skip the docker interface if it + // somehow is first in the list. addrs, err := net.InterfaceAddrs() if err != nil { glog.Fatalf("Unable to get network interfaces: error='%v'", err) @@ -190,11 +195,11 @@ func New(c *Config) *Master { apiPrefix: c.APIPrefix, corsAllowedOriginList: c.CorsAllowedOriginList, tokenAuthFile: c.TokenAuthFile, - elector: election.NewEtcdMasterElector(c.EtcdHelper.Client), + masterCount: c.MasterCount, readOnlyServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadOnlyPort))), readWriteServer: net.JoinHostPort(c.PublicAddress, strconv.Itoa(int(c.ReadWritePort))), } - m.electedMasterServices = util.NewRunner(m.serviceWriterLoop, m.electionAnnounce) + m.masterServices = util.NewRunner(m.serviceWriterLoop, m.roServiceWriterLoop) m.init(c) return m } @@ -281,18 +286,8 @@ func (m *Master) init(c *Config) { m.Handler = handler - if m.readWriteServer != "" { - glog.Infof("Starting election services as %v", m.readWriteServer) - go election.Notify(m.elector, "/registry/elections/k8smaster", m.readWriteServer, m.electedMasterServices) - } - - // TODO: start a goroutine to report ourselves to the elected master. -} - -func (m *Master) electionAnnounce(stop chan struct{}) { - glog.Infof("Elected as master") - <-stop - glog.Info("Lost election for master") + // TODO: Attempt clean shutdown? + m.masterServices.Start() } // API_v1beta1 returns the resources and codec for API version v1beta1. diff --git a/pkg/master/publish.go b/pkg/master/publish.go index 6fc19d26cb3..83d3447ca06 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -35,21 +35,31 @@ func (m *Master) serviceWriterLoop(stop chan struct{}) { if err := m.createMasterServiceIfNeeded("kubernetes", 443); err != nil { glog.Errorf("Can't create rw service: %v", err) } - if err := m.setEndpoints("kubernetes", []string{m.readWriteServer}); err != nil { + if err := m.ensureEndpointsContain("kubernetes", m.readWriteServer); err != nil { glog.Errorf("Can't create rw endpoints: %v", err) } - } else { - m.deleteMasterService("kubernetes") } + + select { + case <-stop: + return + case <-time.After(10 * time.Second): + } + } +} + +func (m *Master) roServiceWriterLoop(stop chan struct{}) { + for { + // Update service & endpoint records. + // TODO: when it becomes possible to change this stuff, + // stop polling and start watching. if m.readOnlyServer != "" { if err := m.createMasterServiceIfNeeded("kubernetes-ro", 80); err != nil { glog.Errorf("Can't create ro service: %v", err) } - if err := m.setEndpoints("kubernetes-ro", []string{m.readOnlyServer}); err != nil { - glog.Errorf("Can't create rw endpoints: %v", err) + if err := m.ensureEndpointsContain("kubernetes-ro", m.readOnlyServer); err != nil { + glog.Errorf("Can't create ro endpoints: %v", err) } - } else { - m.deleteMasterService("kubernetes-ro") } select { @@ -86,20 +96,18 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, port int) error return err } resp := <-c - if _, ok := resp.(*api.Service); ok { + if _, ok := resp.Object.(*api.Service); ok { // If all worked, we get back an *api.Service object. return nil } return fmt.Errorf("Unexpected response: %#v", resp) } -func (m *Master) deleteMasterService(serviceName string) { - ctx := api.NewDefaultContext() - m.serviceRegistry.DeleteService(ctx, serviceName) -} - -// setEndpoints sets the endpoints for the given service. -func (m *Master) setEndpoints(serviceName string, endpoints []string) error { +// ensureEndpointsContain sets the endpoints for the given service. Also removes +// excess endpoints (as determined by m.masterCount). Extra endpoints could appear +// in the list if, for example, the master starts running on a different machine, +// changing IP addresses. +func (m *Master) ensureEndpointsContain(serviceName string, endpoint string) error { ctx := api.NewDefaultContext() e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName) if err != nil { @@ -108,6 +116,23 @@ func (m *Master) setEndpoints(serviceName string, endpoints []string) error { e.ObjectMeta.Name = serviceName e.ObjectMeta.Namespace = "default" } - e.Endpoints = endpoints + found := false + for i := range e.Endpoints { + if e.Endpoints[i] == endpoint { + found = true + break + } + } + if !found { + e.Endpoints = append(e.Endpoints, endpoint) + } + if len(e.Endpoints) > m.masterCount { + // We append to the end and remove from the beginning, so this should + // converge rapidly with all masters performing this operation. + e.Endpoints = e.Endpoints[len(e.Endpoints)-m.masterCount:] + } else if found { + // We didn't make any changes, no need to actually call update. + return nil + } return m.endpointRegistry.UpdateEndpoints(ctx, e) }