diff --git a/pkg/master/controller.go b/pkg/master/controller.go index 41fd7cb25cd..38e871d489b 100644 --- a/pkg/master/controller.go +++ b/pkg/master/controller.go @@ -59,6 +59,8 @@ type Controller struct { ServiceIP net.IP ServicePort int + ExtraServicePorts []api.ServicePort + ExtraEndpointPorts []api.EndpointPort PublicServicePort int KubernetesServiceNodePort int @@ -111,10 +113,12 @@ func (c *Controller) UpdateKubernetesService() error { return err } if c.ServiceIP != nil { - if err := c.CreateMasterServiceIfNeeded("kubernetes", c.ServiceIP, c.ServicePort, c.KubernetesServiceNodePort); err != nil { + servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.KubernetesServiceNodePort, "https", c.ExtraServicePorts) + if err := c.CreateMasterServiceIfNeeded("kubernetes", c.ServiceIP, servicePorts, serviceType); err != nil { return err } - if err := c.SetEndpoints("kubernetes", c.PublicIP, c.PublicServicePort); err != nil { + endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts) + if err := c.SetEndpoints("kubernetes", c.PublicIP, endpointPorts); err != nil { return err } } @@ -143,31 +147,44 @@ func (c *Controller) CreateNamespaceIfNeeded(ns string) error { // createPortAndServiceSpec creates an array of service ports. // If the NodePort value is 0, just the servicePort is used, otherwise, a node port is exposed. -func createPortAndServiceSpec(servicePort int, nodePort int) ([]api.ServicePort, api.ServiceType) { +func createPortAndServiceSpec(servicePort int, nodePort int, servicePortName string, extraServicePorts []api.ServicePort) ([]api.ServicePort, api.ServiceType) { //Use the Cluster IP type for the service port if NodePort isn't provided. //Otherwise, we will be binding the master service to a NodePort. - if nodePort <= 0 { - return []api.ServicePort{{Protocol: api.ProtocolTCP, - Port: servicePort, - TargetPort: util.NewIntOrStringFromInt(servicePort)}}, api.ServiceTypeClusterIP - } - return []api.ServicePort{{Protocol: api.ProtocolTCP, + servicePorts := []api.ServicePort{{Protocol: api.ProtocolTCP, Port: servicePort, - TargetPort: util.NewIntOrStringFromInt(servicePort), - NodePort: nodePort, - }}, api.ServiceTypeNodePort + Name: servicePortName, + TargetPort: util.NewIntOrStringFromInt(servicePort)}} + serviceType := api.ServiceTypeClusterIP + if nodePort > 0 { + servicePorts[0].NodePort = nodePort + serviceType = api.ServiceTypeNodePort + } + if extraServicePorts != nil { + servicePorts = append(servicePorts, extraServicePorts...) + } + return servicePorts, serviceType +} + +// createEndpointPortSpec creates an array of endpoint ports +func createEndpointPortSpec(endpointPort int, endpointPortName string, extraEndpointPorts []api.EndpointPort) []api.EndpointPort { + endpointPorts := []api.EndpointPort{{Protocol: api.ProtocolTCP, + Port: endpointPort, + Name: endpointPortName, + }} + if extraEndpointPorts != nil { + endpointPorts = append(endpointPorts, extraEndpointPorts...) + } + return endpointPorts } // CreateMasterServiceIfNeeded will create the specified service if it // doesn't already exist. -func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePort, nodePort int) error { +func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []api.ServicePort, serviceType api.ServiceType) error { ctx := api.NewDefaultContext() if _, err := c.ServiceRegistry.GetService(ctx, serviceName); err == nil { // The service already exists. return nil } - - ports, serviceType := createPortAndServiceSpec(servicePort, nodePort) svc := &api.Service{ ObjectMeta: api.ObjectMeta{ Name: serviceName, @@ -175,7 +192,7 @@ func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP n Labels: map[string]string{"provider": "kubernetes", "component": "apiserver"}, }, Spec: api.ServiceSpec{ - Ports: ports, + Ports: servicePorts, // maintained by this code, not by the pod selector Selector: nil, ClusterIP: serviceIP.String(), @@ -207,7 +224,7 @@ func (c *Controller) CreateMasterServiceIfNeeded(serviceName string, serviceIP n // to be running (c.masterCount). // * SetEndpoints is called periodically from all apiservers. // -func (c *Controller) SetEndpoints(serviceName string, ip net.IP, port int) error { +func (c *Controller) SetEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort) error { ctx := api.NewDefaultContext() e, err := c.EndpointRegistry.GetEndpoints(ctx, serviceName) if err != nil { @@ -220,13 +237,13 @@ func (c *Controller) SetEndpoints(serviceName string, ip net.IP, port int) error } // First, determine if the endpoint is in the format we expect (one - // subset, one port, N IP addresses). - formatCorrect, ipCorrect := checkEndpointSubsetFormat(e, ip.String(), port, c.MasterCount) + // subset, ports matching endpointPorts, N IP addresses). + formatCorrect, ipCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, c.MasterCount) if !formatCorrect { // Something is egregiously wrong, just re-make the endpoints record. e.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: ip.String()}}, - Ports: []api.EndpointPort{{Port: port, Protocol: api.ProtocolTCP}}, + Ports: endpointPorts, }} glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e) return c.EndpointRegistry.UpdateEndpoints(ctx, e) @@ -259,17 +276,19 @@ func (c *Controller) SetEndpoints(serviceName string, ip net.IP, port int) error // Determine if the endpoint is in the format SetEndpoints expect (one subset, // one port, N IP addresses); and if the specified IP address is present and // the correct number of ip addresses are found. -func checkEndpointSubsetFormat(e *api.Endpoints, ip string, port int, count int) (formatCorrect, ipCorrect bool) { +func checkEndpointSubsetFormat(e *api.Endpoints, ip string, ports []api.EndpointPort, count int) (formatCorrect, ipCorrect bool) { if len(e.Subsets) != 1 { return false, false } sub := &e.Subsets[0] - if len(sub.Ports) != 1 { + if len(sub.Ports) != len(ports) { return false, false } - p := &sub.Ports[0] - if p.Port != port || p.Protocol != api.ProtocolTCP { - return false, false + for i, p := range ports { + ep := &sub.Ports[i] + if p.Port != ep.Port || p.Protocol != ep.Protocol || p.Name != ep.Name { + return false, false + } } for _, addr := range sub.Addresses { if addr.IP == ip { diff --git a/pkg/master/controller_test.go b/pkg/master/controller_test.go index e7a4acf8483..fc826a85ff3 100644 --- a/pkg/master/controller_test.go +++ b/pkg/master/controller_test.go @@ -34,51 +34,51 @@ func TestSetEndpoints(t *testing.T) { testName string serviceName string ip string - port int + endpointPorts []api.EndpointPort additionalMasters int endpoints *api.EndpointsList expectUpdate *api.Endpoints // nil means none expected }{ { - testName: "no existing endpoints", - serviceName: "foo", - ip: "1.2.3.4", - port: 8080, - endpoints: nil, + testName: "no existing endpoints", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + endpoints: nil, expectUpdate: &api.Endpoints{ ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, { - testName: "existing endpoints satisfy", - serviceName: "foo", - ip: "1.2.3.4", - port: 8080, + testName: "existing endpoints satisfy", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, }, { - testName: "existing endpoints satisfy but too many", - serviceName: "foo", - ip: "1.2.3.4", - port: 8080, + testName: "existing endpoints satisfy but too many", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}, {IP: "4.3.2.1"}}, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, @@ -86,7 +86,7 @@ func TestSetEndpoints(t *testing.T) { ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -94,7 +94,7 @@ func TestSetEndpoints(t *testing.T) { testName: "existing endpoints satisfy but too many + extra masters", serviceName: "foo", ip: "1.2.3.4", - port: 8080, + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, additionalMasters: 3, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ @@ -107,7 +107,7 @@ func TestSetEndpoints(t *testing.T) { {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, @@ -120,7 +120,7 @@ func TestSetEndpoints(t *testing.T) { {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, @@ -128,7 +128,7 @@ func TestSetEndpoints(t *testing.T) { testName: "existing endpoints satisfy but too many + extra masters + delete first", serviceName: "foo", ip: "4.3.2.4", - port: 8080, + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, additionalMasters: 3, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ @@ -141,7 +141,7 @@ func TestSetEndpoints(t *testing.T) { {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, @@ -154,21 +154,21 @@ func TestSetEndpoints(t *testing.T) { {IP: "4.3.2.3"}, {IP: "4.3.2.4"}, }, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, { - testName: "existing endpoints wrong name", - serviceName: "foo", - ip: "1.2.3.4", - port: 8080, + testName: "existing endpoints wrong name", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ ObjectMeta: om("bar"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, @@ -176,21 +176,21 @@ func TestSetEndpoints(t *testing.T) { ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, { - testName: "existing endpoints wrong IP", - serviceName: "foo", - ip: "1.2.3.4", - port: 8080, + testName: "existing endpoints wrong IP", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "4.3.2.1"}}, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }}, }, @@ -198,21 +198,21 @@ func TestSetEndpoints(t *testing.T) { ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, { - testName: "existing endpoints wrong port", - serviceName: "foo", - ip: "1.2.3.4", - port: 8080, + testName: "existing endpoints wrong port", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Port: 9090, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}}, }}, }}, }, @@ -220,21 +220,21 @@ func TestSetEndpoints(t *testing.T) { ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, }}, }, }, { - testName: "existing endpoints wrong protocol", - serviceName: "foo", - ip: "1.2.3.4", - port: 8080, + testName: "existing endpoints wrong protocol", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpoints: &api.EndpointsList{ Items: []api.Endpoints{{ ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "UDP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}}, }}, }}, }, @@ -242,7 +242,80 @@ func TestSetEndpoints(t *testing.T) { ObjectMeta: om("foo"), Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, - Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints wrong port name", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, + }}, + }, + }, + { + testName: "existing endpoints extra service ports satisfy", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{ + {Name: "foo", Port: 8080, Protocol: "TCP"}, + {Name: "bar", Port: 1000, Protocol: "TCP"}, + {Name: "baz", Port: 1010, Protocol: "TCP"}, + }, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{ + {Name: "foo", Port: 8080, Protocol: "TCP"}, + {Name: "bar", Port: 1000, Protocol: "TCP"}, + {Name: "baz", Port: 1010, Protocol: "TCP"}, + }, + }}, + }}, + }, + }, + { + testName: "existing endpoints extra service ports missing port", + serviceName: "foo", + ip: "1.2.3.4", + endpointPorts: []api.EndpointPort{ + {Name: "foo", Port: 8080, Protocol: "TCP"}, + {Name: "bar", Port: 1000, Protocol: "TCP"}, + }, + endpoints: &api.EndpointsList{ + Items: []api.Endpoints{{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, + }}, + }}, + }, + expectUpdate: &api.Endpoints{ + ObjectMeta: om("foo"), + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{ + {Name: "foo", Port: 8080, Protocol: "TCP"}, + {Name: "bar", Port: 1000, Protocol: "TCP"}, + }, }}, }, }, @@ -253,13 +326,13 @@ func TestSetEndpoints(t *testing.T) { Endpoints: test.endpoints, } master.EndpointRegistry = registry - err := master.SetEndpoints(test.serviceName, net.ParseIP(test.ip), test.port) + err := master.SetEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) } if test.expectUpdate != nil { if len(registry.Updates) != 1 { - t.Errorf("case %q: unexpected updates: (%v). Expected exactly 1 change. ", test.testName, registry.Updates) + t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates) } else if e, a := test.expectUpdate, ®istry.Updates[0]; !reflect.DeepEqual(e, a) { t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) } diff --git a/pkg/master/master.go b/pkg/master/master.go index 0c63eb7be6a..0f886fe8c5a 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -246,6 +246,18 @@ type Config struct { // Used to start and monitor tunneling Tunneler Tunneler + // Additional ports to be exposed on the master service + // extraServicePorts is injectable in the event that more ports + // (other than the default 443/tcp) are exposed on the master + // and those ports need to be load balanced by the master + // service because this pkg is linked by out-of-tree projects + // like openshift which want to use the master but also do + // more stuff. + ExtraServicePorts []api.ServicePort + // Additional ports to be exposed on the master endpoints + // Port names should align with ports defined in ExtraServicePorts + ExtraEndpointPorts []api.EndpointPort + KubernetesServiceNodePort int } @@ -288,6 +300,8 @@ type Master struct { serviceReadWriteIP net.IP serviceReadWritePort int masterServices *util.Runner + extraServicePorts []api.ServicePort + extraEndpointPorts []api.EndpointPort // storage contains the RESTful endpoints exposed by this master storage map[string]rest.Storage @@ -450,6 +464,8 @@ func New(c *Config) *Master { serviceReadWriteIP: c.ServiceReadWriteIP, // TODO: serviceReadWritePort should be passed in as an argument, it may not always be 443 serviceReadWritePort: 443, + extraServicePorts: c.ExtraServicePorts, + extraEndpointPorts: c.ExtraEndpointPorts, tunneler: c.Tunneler, @@ -757,6 +773,8 @@ func (m *Master) NewBootstrapController() *Controller { ServiceIP: m.serviceReadWriteIP, ServicePort: m.serviceReadWritePort, + ExtraServicePorts: m.extraServicePorts, + ExtraEndpointPorts: m.extraEndpointPorts, PublicServicePort: m.publicReadWritePort, KubernetesServiceNodePort: m.KubernetesServiceNodePort, } diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 3ed67fc8b03..ba393f7d9b9 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -211,6 +211,35 @@ func TestNewBootstrapController(t *testing.T) { assert.Equal(controller.PublicServicePort, master.publicReadWritePort) } +// TestControllerServicePorts verifies master extraServicePorts are +// correctly copied into controller +func TestControllerServicePorts(t *testing.T) { + master, _, assert := setUp(t) + master.namespaceRegistry = namespace.NewRegistry(nil) + master.serviceRegistry = registrytest.NewServiceRegistry() + master.endpointRegistry = endpoint.NewRegistry(nil) + + master.extraServicePorts = []api.ServicePort{ + { + Name: "additional-port-1", + Port: 1000, + Protocol: api.ProtocolTCP, + TargetPort: util.NewIntOrStringFromInt(1000), + }, + { + Name: "additional-port-2", + Port: 1010, + Protocol: api.ProtocolTCP, + TargetPort: util.NewIntOrStringFromInt(1010), + }, + } + + controller := master.NewBootstrapController() + + assert.Equal(1000, controller.ExtraServicePorts[0].Port) + assert.Equal(1010, controller.ExtraServicePorts[1].Port) +} + // TestNewHandlerContainer verifies that NewHandlerContainer uses the // mux provided func TestNewHandlerContainer(t *testing.T) {