diff --git a/pkg/master/controller.go b/pkg/master/controller.go index 95c7b49bdab..e0482e65f68 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -44,8 +44,6 @@ import ( type Controller struct { NamespaceRegistry namespace.Registry ServiceRegistry service.Registry - // TODO: MasterCount is yucky - MasterCount int ServiceClusterIPRegistry service.RangeRegistry ServiceClusterIPInterval time.Duration @@ -55,8 +53,8 @@ type Controller struct { ServiceNodePortInterval time.Duration ServiceNodePortRange utilnet.PortRange - EndpointRegistry endpoint.Registry - EndpointInterval time.Duration + EndpointReconciler EndpointReconciler + EndpointInterval time.Duration SystemNamespaces []string SystemNamespacesInterval time.Duration @@ -140,7 +138,7 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error { return err } endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts) - if err := c.ReconcileEndpoints("kubernetes", c.PublicIP, endpointPorts, reconcile); err != nil { + if err := c.EndpointReconciler.ReconcileEndpoints("kubernetes", c.PublicIP, endpointPorts, reconcile); err != nil { return err } } @@ -240,6 +238,39 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser return err } +// EndpointReconciler knows how to reconcile the endpoints for the apiserver service. +type EndpointReconciler interface { + // ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw). + // ReconcileEndpoints expects that the endpoints objects it manages will all be + // managed only by ReconcileEndpoints; therefore, to understand this, you need only + // understand the requirements. + // + // Requirements: + // * All apiservers MUST use the same ports for their {rw, ro} services. + // * All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the + // endpoints for their {rw, ro} services. + // * ReconcileEndpoints is called periodically from all apiservers. + ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error +} + +// masterCountEndpointReconciler reconciles endpoints based on a specified expected number of +// masters. masterCountEndpointReconciler implements EndpointReconciler. +type masterCountEndpointReconciler struct { + masterCount int + endpointRegistry endpoint.Registry +} + +var _ EndpointReconciler = &masterCountEndpointReconciler{} + +// NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a +// specified expected number of masters. +func NewMasterCountEndpointReconciler(masterCount int, endpointRegistry endpoint.Registry) *masterCountEndpointReconciler { + return &masterCountEndpointReconciler{ + masterCount: masterCount, + endpointRegistry: endpointRegistry, + } +} + // ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw). // ReconcileEndpoints expects that the endpoints objects it manages will all be // managed only by ReconcileEndpoints; therefore, to understand this, you need only @@ -252,10 +283,9 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser // * All apiservers MUST know and agree on the number of apiservers expected // to be running (c.masterCount). // * ReconcileEndpoints is called periodically from all apiservers. -// -func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { +func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error { ctx := api.NewDefaultContext() - e, err := c.EndpointRegistry.GetEndpoints(ctx, serviceName) + e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName) if err != nil { e = &api.Endpoints{ ObjectMeta: api.ObjectMeta{ @@ -267,7 +297,7 @@ func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointP // First, determine if the endpoint is in the format we expect (one // subset, ports matching endpointPorts, N IP addresses). - formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, c.MasterCount, reconcilePorts) + formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, r.masterCount, reconcilePorts) if !formatCorrect { // Something is egregiously wrong, just re-make the endpoints record. e.Subsets = []api.EndpointSubset{{ @@ -275,7 +305,7 @@ func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointP Ports: endpointPorts, }} glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e) - return c.EndpointRegistry.UpdateEndpoints(ctx, e) + return r.endpointRegistry.UpdateEndpoints(ctx, e) } if ipCorrect && portsCorrect { return nil @@ -291,11 +321,11 @@ func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointP // own IP address. Given the requirements stated at the top of // this function, this should cause the list of IP addresses to // become eventually correct. - if addrs := &e.Subsets[0].Addresses; len(*addrs) > c.MasterCount { + if addrs := &e.Subsets[0].Addresses; len(*addrs) > r.masterCount { // addrs is a pointer because we're going to mutate it. for i, addr := range *addrs { if addr.IP == ip.String() { - for len(*addrs) > c.MasterCount { + for len(*addrs) > r.masterCount { // wrap around if necessary. remove := (i + 1) % len(*addrs) *addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...) @@ -310,7 +340,7 @@ func (c *Controller) ReconcileEndpoints(serviceName string, ip net.IP, endpointP e.Subsets[0].Ports = endpointPorts } glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e) - return c.EndpointRegistry.UpdateEndpoints(ctx, e) + return r.endpointRegistry.UpdateEndpoints(ctx, e) } // Determine if the endpoint is in the format ReconcileEndpoints expects. diff --git a/pkg/master/controller_test.go b/pkg/master/controller_test.go index e79c4af69b0..63b9bfb0f09 100644 --- a/pkg/master/controller_test.go +++ b/pkg/master/controller_test.go @@ -371,12 +371,11 @@ func TestReconcileEndpoints(t *testing.T) { }, } for _, test := range reconcile_tests { - master := Controller{MasterCount: test.additionalMasters + 1} registry := ®istrytest.EndpointRegistry{ Endpoints: test.endpoints, } - master.EndpointRegistry = registry - err := master.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true) + reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, registry) + err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) } @@ -461,12 +460,11 @@ func TestReconcileEndpoints(t *testing.T) { }, } for _, test := range non_reconcile_tests { - master := Controller{MasterCount: test.additionalMasters + 1} registry := ®istrytest.EndpointRegistry{ Endpoints: test.endpoints, } - master.EndpointRegistry = registry - err := master.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false) + reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, registry) + err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) } @@ -519,7 +517,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) { }, } for _, test := range create_tests { - master := Controller{MasterCount: 1} + master := Controller{} registry := ®istrytest.ServiceRegistry{ Err: errors.New("unable to get svc"), } @@ -794,7 +792,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) { }, } for _, test := range reconcile_tests { - master := Controller{MasterCount: 1} + master := Controller{} registry := ®istrytest.ServiceRegistry{ Service: test.service, } @@ -846,7 +844,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) { }, } for _, test := range non_reconcile_tests { - master := Controller{MasterCount: 1} + master := Controller{} registry := ®istrytest.ServiceRegistry{ Service: test.service, } diff --git a/pkg/master/master.go b/pkg/master/master.go index bc32ace6564..a6c4b323661 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -590,10 +590,9 @@ func (m *Master) NewBootstrapController() *Controller { return &Controller{ NamespaceRegistry: m.namespaceRegistry, ServiceRegistry: m.serviceRegistry, - MasterCount: m.MasterCount, - EndpointRegistry: m.endpointRegistry, - EndpointInterval: 10 * time.Second, + EndpointReconciler: NewMasterCountEndpointReconciler(m.MasterCount, m.endpointRegistry), + EndpointInterval: 10 * time.Second, SystemNamespaces: []string{api.NamespaceSystem}, SystemNamespacesInterval: 1 * time.Minute, diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 57b1b930ca7..b0eb668acc5 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -257,10 +257,9 @@ func TestNewBootstrapController(t *testing.T) { controller := master.NewBootstrapController() assert.Equal(controller.NamespaceRegistry, master.namespaceRegistry) - assert.Equal(controller.EndpointRegistry, master.endpointRegistry) + assert.Equal(controller.EndpointReconciler, NewMasterCountEndpointReconciler(master.MasterCount, master.endpointRegistry)) assert.Equal(controller.ServiceRegistry, master.serviceRegistry) assert.Equal(controller.ServiceNodePortRange, portRange) - assert.Equal(controller.MasterCount, master.MasterCount) assert.Equal(controller.ServicePort, master.ServiceReadWritePort) assert.Equal(controller.PublicServicePort, master.PublicReadWritePort) }