Additional service ports config for master service.

This commit is contained in:
Andrew Butcher 2015-10-07 11:06:05 -04:00
parent eeeb5e0cd6
commit efd8e3c9c7
4 changed files with 214 additions and 75 deletions

View File

@ -59,6 +59,8 @@ type Controller struct {
ServiceIP net.IP
ServicePort int
ExtraServicePorts []api.ServicePort
ExtraEndpointPorts []api.EndpointPort
PublicServicePort int
KubernetesServiceNodePort int
@ -111,10 +113,12 @@ func (c *Controller) UpdateKubernetesService() error {
return err
}
if c.ServiceIP != nil {
if err := c.CreateMasterServiceIfNeeded("kubernetes", c.ServiceIP, c.ServicePort, c.KubernetesServiceNodePort); err != nil {
servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts)
if err := c.CreateMasterServiceIfNeeded("kubernetes", c.ServiceIP, servicePorts, serviceType); err != nil {
return err
}
if err := c.SetEndpoints("kubernetes", c.PublicIP, c.PublicServicePort); err != nil {
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
if err := c.SetEndpoints("kubernetes", c.PublicIP, endpointPorts); err != nil {
return err
}
}
@ -143,31 +147,44 @@ func (c *Controller) CreateNamespaceIfNeeded(ns string) error {
// createPortAndServiceSpec creates an array of service ports.
// If the NodePort value is 0, just the servicePort is used, otherwise, a node port is exposed.
func createPortAndServiceSpec(servicePort int, nodePort int) ([]api.ServicePort, api.ServiceType) {
func createPortAndServiceSpec(servicePort int, nodePort int, servicePortName string, extraServicePorts []api.ServicePort) ([]api.ServicePort, api.ServiceType) {
//Use the Cluster IP type for the service port if NodePort isn't provided.
//Otherwise, we will be binding the master service to a NodePort.
if nodePort <= 0 {
return []api.ServicePort{{Protocol: api.ProtocolTCP,
Port: servicePort,
TargetPort: util.NewIntOrStringFromInt(servicePort)}}, api.ServiceTypeClusterIP
}
return []api.ServicePort{{Protocol: api.ProtocolTCP,
servicePorts := []api.ServicePort{{Protocol: api.ProtocolTCP,
Port: servicePort,
TargetPort: util.NewIntOrStringFromInt(servicePort),
NodePort: nodePort,
}}, api.ServiceTypeNodePort
Name: servicePortName,
TargetPort: util.NewIntOrStringFromInt(servicePort)}}
serviceType := api.ServiceTypeClusterIP
if nodePort > 0 {
servicePorts[0].NodePort = nodePort
serviceType = api.ServiceTypeNodePort
}
if extraServicePorts != nil {
servicePorts = append(servicePorts, extraServicePorts...)
}
return servicePorts, serviceType
}
// createEndpointPortSpec creates an array of endpoint ports
func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndpointPorts []api.EndpointPort) []api.EndpointPort {
endpointPorts := []api.EndpointPort{{Protocol: api.ProtocolTCP,
Port: endpointPort,
Name: endpointPortName,
}}
if extraEndpointPorts != nil {
endpointPorts = append(endpointPorts, extraEndpointPorts...)
}
return endpointPorts
}
// CreateMasterServiceIfNeeded will create the specified service if it
// doesn't already exist.
func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePort, nodePort int) error {
func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []api.ServicePort, serviceType api.ServiceType) error {
ctx := api.NewDefaultContext()
if _, err := c.ServiceRegistry.GetService(ctx, serviceName); err == nil {
// The service already exists.
return nil
}
ports, serviceType := createPortAndServiceSpec(servicePort, nodePort)
svc := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: serviceName,
@ -175,7 +192,7 @@ func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP n
Labels: map[string]string{"provider": "kubernetes", "component": "apiserver"},
},
Spec: api.ServiceSpec{
Ports: ports,
Ports: servicePorts,
// maintained by this code, not by the pod selector
Selector: nil,
ClusterIP: serviceIP.String(),
@ -207,7 +224,7 @@ func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP n
// to be running (c.masterCount).
// * SetEndpoints is called periodically from all apiservers.
//
func (c *Controller) SetEndpoints(serviceName string, ip net.IP, port int) error {
func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error {
ctx := api.NewDefaultContext()
e, err := c.EndpointRegistry.GetEndpoints(ctx, serviceName)
if err != nil {
@ -220,13 +237,13 @@ func (c *Controller) SetEndpoints(serviceName string, ip net.IP, port int) error
}
// First, determine if the endpoint is in the format we expect (one
// subset, one port, N IP addresses).
formatCorrect, ipCorrect := checkEndpointSubsetFormat(e, ip.String(), port, c.MasterCount)
// subset, ports matching endpointPorts, N IP addresses).
formatCorrect, ipCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, c.MasterCount)
if !formatCorrect {
// Something is egregiously wrong, just re-make the endpoints record.
e.Subsets = []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: ip.String()}},
Ports: []api.EndpointPort{{Port: port, Protocol: api.ProtocolTCP}},
Ports: endpointPorts,
}}
glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
return c.EndpointRegistry.UpdateEndpoints(ctx, e)
@ -259,17 +276,19 @@ func (c *Controller) SetEndpoints(serviceName string, ip net.IP, port int) error
// Determine if the endpoint is in the format SetEndpoints expect (one subset,
// one port, N IP addresses); and if the specified IP address is present and
// the correct number of ip addresses are found.
func checkEndpointSubsetFormat(e *api.Endpoints, ip string, port int, count int) (formatCorrect, ipCorrect bool) {
func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.EndpointPort, count int) (formatCorrect, ipCorrect bool) {
if len(e.Subsets) != 1 {
return false, false
}
sub := &e.Subsets[0]
if len(sub.Ports) != 1 {
if len(sub.Ports) != len(ports) {
return false, false
}
p := &sub.Ports[0]
if p.Port != port || p.Protocol != api.ProtocolTCP {
return false, false
for i, p := range ports {
ep := &sub.Ports[i]
if p.Port != ep.Port || p.Protocol != ep.Protocol || p.Name != ep.Name {
return false, false
}
}
for _, addr := range sub.Addresses {
if addr.IP == ip {

View File

@ -34,51 +34,51 @@ func TestSetEndpoints(t *testing.T) {
testName string
serviceName string
ip string
port int
endpointPorts []api.EndpointPort
additionalMasters int
endpoints *api.EndpointsList
expectUpdate *api.Endpoints // nil means none expected
}{
{
testName: "no existing endpoints",
serviceName: "foo",
ip: "1.2.3.4",
port: 8080,
endpoints: nil,
testName: "no existing endpoints",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: nil,
expectUpdate: &api.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
{
testName: "existing endpoints satisfy",
serviceName: "foo",
ip: "1.2.3.4",
port: 8080,
testName: "existing endpoints satisfy",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
},
{
testName: "existing endpoints satisfy but too many",
serviceName: "foo",
ip: "1.2.3.4",
port: 8080,
testName: "existing endpoints satisfy but too many",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}, {IP: "4.3.2.1"}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
@ -86,7 +86,7 @@ func TestSetEndpoints(t *testing.T) {
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -94,7 +94,7 @@ func TestSetEndpoints(t *testing.T) {
testName: "existing endpoints satisfy but too many + extra masters",
serviceName: "foo",
ip: "1.2.3.4",
port: 8080,
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
additionalMasters: 3,
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
@ -107,7 +107,7 @@ func TestSetEndpoints(t *testing.T) {
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
@ -120,7 +120,7 @@ func TestSetEndpoints(t *testing.T) {
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
@ -128,7 +128,7 @@ func TestSetEndpoints(t *testing.T) {
testName: "existing endpoints satisfy but too many + extra masters + delete first",
serviceName: "foo",
ip: "4.3.2.4",
port: 8080,
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
additionalMasters: 3,
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
@ -141,7 +141,7 @@ func TestSetEndpoints(t *testing.T) {
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
@ -154,21 +154,21 @@ func TestSetEndpoints(t *testing.T) {
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
{
testName: "existing endpoints wrong name",
serviceName: "foo",
ip: "1.2.3.4",
port: 8080,
testName: "existing endpoints wrong name",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
ObjectMeta: om("bar"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
@ -176,21 +176,21 @@ func TestSetEndpoints(t *testing.T) {
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
{
testName: "existing endpoints wrong IP",
serviceName: "foo",
ip: "1.2.3.4",
port: 8080,
testName: "existing endpoints wrong IP",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "4.3.2.1"}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
@ -198,21 +198,21 @@ func TestSetEndpoints(t *testing.T) {
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
{
testName: "existing endpoints wrong port",
serviceName: "foo",
ip: "1.2.3.4",
port: 8080,
testName: "existing endpoints wrong port",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Port: 9090, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}},
}},
}},
},
@ -220,21 +220,21 @@ func TestSetEndpoints(t *testing.T) {
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
{
testName: "existing endpoints wrong protocol",
serviceName: "foo",
ip: "1.2.3.4",
port: 8080,
testName: "existing endpoints wrong protocol",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "UDP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}},
}},
}},
},
@ -242,7 +242,80 @@ func TestSetEndpoints(t *testing.T) {
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
},
{
testName: "existing endpoints wrong port name",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
}},
},
},
{
testName: "existing endpoints extra service ports satisfy",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"},
{Name: "baz", Port: 1010, Protocol: "TCP"},
},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"},
{Name: "baz", Port: 1010, Protocol: "TCP"},
},
}},
}},
},
},
{
testName: "existing endpoints extra service ports missing port",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"},
},
endpoints: &api.EndpointsList{
Items: []api.Endpoints{{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &api.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []api.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"},
},
}},
},
},
@ -253,13 +326,13 @@ func TestSetEndpoints(t *testing.T) {
Endpoints: test.endpoints,
}
master.EndpointRegistry = registry
err := master.SetEndpoints(test.serviceName, net.ParseIP(test.ip), test.port)
err := master.SetEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}
if test.expectUpdate != nil {
if len(registry.Updates) != 1 {
t.Errorf("case %q: unexpected updates: (%v). Expected exactly 1 change. ", test.testName, registry.Updates)
t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates)
} else if e, a := test.expectUpdate, &registry.Updates[0]; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}

View File

@ -246,6 +246,18 @@ type Config struct {
// Used to start and monitor tunneling
Tunneler Tunneler
// Additional ports to be exposed on the master service
// extraServicePorts is injectable in the event that more ports
// (other than the default 443/tcp) are exposed on the master
// and those ports need to be load balanced by the master
// service because this pkg is linked by out-of-tree projects
// like openshift which want to use the master but also do
// more stuff.
ExtraServicePorts []api.ServicePort
// Additional ports to be exposed on the master endpoints
// Port names should align with ports defined in ExtraServicePorts
ExtraEndpointPorts []api.EndpointPort
KubernetesServiceNodePort int
}
@ -288,6 +300,8 @@ type Master struct {
serviceReadWriteIP net.IP
serviceReadWritePort int
masterServices *util.Runner
extraServicePorts []api.ServicePort
extraEndpointPorts []api.EndpointPort
// storage contains the RESTful endpoints exposed by this master
storage map[string]rest.Storage
@ -450,6 +464,8 @@ func New(c *Config) *Master {
serviceReadWriteIP: c.ServiceReadWriteIP,
// TODO: serviceReadWritePort should be passed in as an argument, it may not always be 443
serviceReadWritePort: 443,
extraServicePorts: c.ExtraServicePorts,
extraEndpointPorts: c.ExtraEndpointPorts,
tunneler: c.Tunneler,
@ -757,6 +773,8 @@ func (m *Master) NewBootstrapController() *Controller {
ServiceIP: m.serviceReadWriteIP,
ServicePort: m.serviceReadWritePort,
ExtraServicePorts: m.extraServicePorts,
ExtraEndpointPorts: m.extraEndpointPorts,
PublicServicePort: m.publicReadWritePort,
KubernetesServiceNodePort: m.KubernetesServiceNodePort,
}

View File

@ -211,6 +211,35 @@ func TestNewBootstrapController(t *testing.T) {
assert.Equal(controller.PublicServicePort, master.publicReadWritePort)
}
// TestControllerServicePorts verifies master extraServicePorts are
// correctly copied into controller
func TestControllerServicePorts(t *testing.T) {
master, _, assert := setUp(t)
master.namespaceRegistry = namespace.NewRegistry(nil)
master.serviceRegistry = registrytest.NewServiceRegistry()
master.endpointRegistry = endpoint.NewRegistry(nil)
master.extraServicePorts = []api.ServicePort{
{
Name: "additional-port-1",
Port: 1000,
Protocol: api.ProtocolTCP,
TargetPort: util.NewIntOrStringFromInt(1000),
},
{
Name: "additional-port-2",
Port: 1010,
Protocol: api.ProtocolTCP,
TargetPort: util.NewIntOrStringFromInt(1010),
},
}
controller := master.NewBootstrapController()
assert.Equal(1000, controller.ExtraServicePorts[0].Port)
assert.Equal(1010, controller.ExtraServicePorts[1].Port)
}
// TestNewHandlerContainer verifies that NewHandlerContainer uses the
// mux provided
func TestNewHandlerContainer(t *testing.T) {