diff --git a/pkg/api/testing/fuzzer.go b/pkg/api/testing/fuzzer.go index ed4b1a0e15f..6e50d76b879 100644 --- a/pkg/api/testing/fuzzer.go +++ b/pkg/api/testing/fuzzer.go @@ -24,6 +24,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -237,7 +239,13 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer { func(ep *api.Endpoint, c fuzz.Continue) { // TODO: If our API used a particular type for IP fields we could just catch that here. ep.IP = fmt.Sprintf("%d.%d.%d.%d", c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256)) - ep.Port = c.Rand.Intn(65536) + // TODO: Once we drop single-port APIs, make this fuzz + // multiple ports and fuzz port.name. This will force + // a compile error when those APIs are deleted. + _ = v1beta1.Dependency + _ = v1beta2.Dependency + ep.Ports = []api.EndpointPort{{Name: "", Port: c.Rand.Intn(65536)}} + c.Fuzz(&ep.Ports[0].Protocol) }, ) return f diff --git a/pkg/api/types.go b/pkg/api/types.go index c1b90936eab..10d785ce9ad 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -750,9 +750,6 @@ type Endpoints struct { TypeMeta `json:",inline"` ObjectMeta `json:"metadata,omitempty"` - // Optional: The IP protocol for these endpoints. Supports "TCP" and - // "UDP". Defaults to "TCP". - Protocol Protocol `json:"protocol,omitempty"` Endpoints []Endpoint `json:"endpoints,omitempty"` } @@ -762,8 +759,21 @@ type Endpoint struct { // TODO: This should allow hostname or IP, see #4447. IP string `json:"ip"` - // Required: The destination port to access. - Port int `json:"port"` + // The ports exposed on this IP. + Ports []EndpointPort +} + +type EndpointPort struct { + // Optional if only one port is defined in this Endpoint. + // The name of this port within the larger service/endpoint structure. + // This must be a DNS_LABEL. + Name string + + // The IP protocol for this port. Supports "TCP" and "UDP". + Protocol Protocol + + // The destination port to access. + Port int } // EndpointsList is a list of endpoints. diff --git a/pkg/api/v1beta1/conversion.go b/pkg/api/v1beta1/conversion.go index 2f178b4de8e..77c5c3d2c90 100644 --- a/pkg/api/v1beta1/conversion.go +++ b/pkg/api/v1beta1/conversion.go @@ -1121,12 +1121,16 @@ func init() { if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil { return err } - if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { - return err - } for i := range in.Endpoints { ep := &in.Endpoints[i] - out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + // newer.Endpoints.Endpoints[i].Ports is an array - take the first one. + if len(ep.Ports) > 0 { + port := &ep.Ports[0] + if err := s.Convert(&port.Protocol, &out.Protocol, 0); err != nil { + return err + } + out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(port.Port))) + } } return nil }, @@ -1137,22 +1141,20 @@ func init() { if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil { return err } - if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { - return err - } for i := range in.Endpoints { - out.Endpoints = append(out.Endpoints, newer.Endpoint{}) - ep := &out.Endpoints[i] host, port, err := net.SplitHostPort(in.Endpoints[i]) if err != nil { return err } - ep.IP = host pn, err := strconv.Atoi(port) if err != nil { return err } - ep.Port = pn + epp := newer.EndpointPort{Port: pn} + if err := s.Convert(&in.Protocol, &epp.Protocol, 0); err != nil { + return err + } + out.Endpoints = append(out.Endpoints, newer.Endpoint{IP: host, Ports: []newer.EndpointPort{epp}}) } return nil }, diff --git a/pkg/api/v1beta1/conversion_test.go b/pkg/api/v1beta1/conversion_test.go index df0009b4198..f5507ce6065 100644 --- a/pkg/api/v1beta1/conversion_test.go +++ b/pkg/api/v1beta1/conversion_test.go @@ -390,41 +390,35 @@ func TestEndpointsConversion(t *testing.T) { }{ { given: current.Endpoints{ - TypeMeta: current.TypeMeta{ - ID: "empty", - }, - Protocol: current.ProtocolTCP, + Protocol: "", Endpoints: []string{}, }, expected: newer.Endpoints{ - Protocol: newer.ProtocolTCP, Endpoints: []newer.Endpoint{}, }, }, { given: current.Endpoints{ - TypeMeta: current.TypeMeta{ - ID: "one", - }, Protocol: current.ProtocolTCP, Endpoints: []string{"1.2.3.4:88"}, }, expected: newer.Endpoints{ - Protocol: newer.ProtocolTCP, - Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}}, + Endpoints: []newer.Endpoint{ + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolTCP, Port: 88}}}, + }, }, }, { given: current.Endpoints{ - TypeMeta: current.TypeMeta{ - ID: "several", - }, Protocol: current.ProtocolUDP, Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"}, }, expected: newer.Endpoints{ - Protocol: newer.ProtocolUDP, - Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}}, + Endpoints: []newer.Endpoint{ + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 88}}}, + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 89}}}, + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 90}}}, + }, }, }, } @@ -436,7 +430,7 @@ func TestEndpointsConversion(t *testing.T) { t.Errorf("[Case: %d] Unexpected error: %v", i, err) continue } - if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { + if !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got) } diff --git a/pkg/api/v1beta1/defaults.go b/pkg/api/v1beta1/defaults.go index 87e66eb7232..e71db4f6ffa 100644 --- a/pkg/api/v1beta1/defaults.go +++ b/pkg/api/v1beta1/defaults.go @@ -86,7 +86,7 @@ func init() { } }, func(obj *Endpoints) { - if obj.Protocol == "" { + if obj.Protocol == "" && len(obj.Endpoints) > 0 { obj.Protocol = "TCP" } }, diff --git a/pkg/api/v1beta1/defaults_test.go b/pkg/api/v1beta1/defaults_test.go index 1a4cd66995c..2127ee0f647 100644 --- a/pkg/api/v1beta1/defaults_test.go +++ b/pkg/api/v1beta1/defaults_test.go @@ -103,11 +103,21 @@ func TestSetDefaultSecret(t *testing.T) { } } -func TestSetDefaulEndpointsProtocol(t *testing.T) { +func TestSetDefaulEndpointsProtocolEmpty(t *testing.T) { in := ¤t.Endpoints{} obj := roundTrip(t, runtime.Object(in)) out := obj.(*current.Endpoints) + if out.Protocol != "" { + t.Errorf("Expected protocol \"\", got %s", out.Protocol) + } +} + +func TestSetDefaulEndpointsProtocol(t *testing.T) { + in := ¤t.Endpoints{Endpoints: []string{"1.2.3.4:5678"}} + obj := roundTrip(t, runtime.Object(in)) + out := obj.(*current.Endpoints) + if out.Protocol != current.ProtocolTCP { t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol) } diff --git a/pkg/api/v1beta1/register.go b/pkg/api/v1beta1/register.go index f61f806f08a..d98c038230c 100644 --- a/pkg/api/v1beta1/register.go +++ b/pkg/api/v1beta1/register.go @@ -24,6 +24,12 @@ import ( // Codec encodes internal objects to the v1beta1 scheme var Codec = runtime.CodecFor(api.Scheme, "v1beta1") +// Dependency does nothing but give a hook for other packages to force a +// compile-time error when this API version is eventually removed. This is +// useful, for example, to clean up things that are implicitly tied to +// semantics of older APIs. +const Dependency = true + func init() { api.Scheme.AddKnownTypes("v1beta1", &Pod{}, diff --git a/pkg/api/v1beta2/conversion.go b/pkg/api/v1beta2/conversion.go index 9c2d73058bd..c038c913153 100644 --- a/pkg/api/v1beta2/conversion.go +++ b/pkg/api/v1beta2/conversion.go @@ -1036,12 +1036,16 @@ func init() { if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil { return err } - if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { - return err - } for i := range in.Endpoints { ep := &in.Endpoints[i] - out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + // newer.Endpoints.Endpoints[i].Ports is an array - take the first one. + if len(ep.Ports) > 0 { + port := &ep.Ports[0] + if err := s.Convert(&port.Protocol, &out.Protocol, 0); err != nil { + return err + } + out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(port.Port))) + } } return nil }, @@ -1052,22 +1056,20 @@ func init() { if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil { return err } - if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { - return err - } for i := range in.Endpoints { - out.Endpoints = append(out.Endpoints, newer.Endpoint{}) - ep := &out.Endpoints[i] host, port, err := net.SplitHostPort(in.Endpoints[i]) if err != nil { return err } - ep.IP = host pn, err := strconv.Atoi(port) if err != nil { return err } - ep.Port = pn + epp := newer.EndpointPort{Port: pn} + if err := s.Convert(&in.Protocol, &epp.Protocol, 0); err != nil { + return err + } + out.Endpoints = append(out.Endpoints, newer.Endpoint{IP: host, Ports: []newer.EndpointPort{epp}}) } return nil }, diff --git a/pkg/api/v1beta2/conversion_test.go b/pkg/api/v1beta2/conversion_test.go index b606476466d..ac88b233d40 100644 --- a/pkg/api/v1beta2/conversion_test.go +++ b/pkg/api/v1beta2/conversion_test.go @@ -220,41 +220,35 @@ func TestEndpointsConversion(t *testing.T) { }{ { given: current.Endpoints{ - TypeMeta: current.TypeMeta{ - ID: "empty", - }, - Protocol: current.ProtocolTCP, + Protocol: "", Endpoints: []string{}, }, expected: newer.Endpoints{ - Protocol: newer.ProtocolTCP, Endpoints: []newer.Endpoint{}, }, }, { given: current.Endpoints{ - TypeMeta: current.TypeMeta{ - ID: "one", - }, Protocol: current.ProtocolTCP, Endpoints: []string{"1.2.3.4:88"}, }, expected: newer.Endpoints{ - Protocol: newer.ProtocolTCP, - Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}}, + Endpoints: []newer.Endpoint{ + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolTCP, Port: 88}}}, + }, }, }, { given: current.Endpoints{ - TypeMeta: current.TypeMeta{ - ID: "several", - }, Protocol: current.ProtocolUDP, Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"}, }, expected: newer.Endpoints{ - Protocol: newer.ProtocolUDP, - Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}}, + Endpoints: []newer.Endpoint{ + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 88}}}, + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 89}}}, + {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 90}}}, + }, }, }, } @@ -266,7 +260,7 @@ func TestEndpointsConversion(t *testing.T) { t.Errorf("[Case: %d] Unexpected error: %v", i, err) continue } - if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { + if !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got) } diff --git a/pkg/api/v1beta2/defaults.go b/pkg/api/v1beta2/defaults.go index 0d5dc090e59..c5bbdebf830 100644 --- a/pkg/api/v1beta2/defaults.go +++ b/pkg/api/v1beta2/defaults.go @@ -88,7 +88,7 @@ func init() { } }, func(obj *Endpoints) { - if obj.Protocol == "" { + if obj.Protocol == "" && len(obj.Endpoints) > 0 { obj.Protocol = "TCP" } }, diff --git a/pkg/api/v1beta2/defaults_test.go b/pkg/api/v1beta2/defaults_test.go index 12cf2aef5b2..1141f53d343 100644 --- a/pkg/api/v1beta2/defaults_test.go +++ b/pkg/api/v1beta2/defaults_test.go @@ -103,11 +103,21 @@ func TestSetDefaultSecret(t *testing.T) { } } -func TestSetDefaulEndpointsProtocol(t *testing.T) { +func TestSetDefaulEndpointsProtocolEmpty(t *testing.T) { in := ¤t.Endpoints{} obj := roundTrip(t, runtime.Object(in)) out := obj.(*current.Endpoints) + if out.Protocol != "" { + t.Errorf("Expected protocol \"\", got %s", out.Protocol) + } +} + +func TestSetDefaulEndpointsProtocol(t *testing.T) { + in := ¤t.Endpoints{Endpoints: []string{"1.2.3.4:5678"}} + obj := roundTrip(t, runtime.Object(in)) + out := obj.(*current.Endpoints) + if out.Protocol != current.ProtocolTCP { t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol) } diff --git a/pkg/api/v1beta2/register.go b/pkg/api/v1beta2/register.go index 990aa7b2039..39612227928 100644 --- a/pkg/api/v1beta2/register.go +++ b/pkg/api/v1beta2/register.go @@ -24,6 +24,12 @@ import ( // Codec encodes internal objects to the v1beta2 scheme var Codec = runtime.CodecFor(api.Scheme, "v1beta2") +// Dependency does nothing but give a hook for other packages to force a +// compile-time error when this API version is eventually removed. This is +// useful, for example, to clean up things that are implicitly tied to +// semantics of older APIs. +const Dependency = true + func init() { api.Scheme.AddKnownTypes("v1beta2", &Pod{}, diff --git a/pkg/api/v1beta3/defaults.go b/pkg/api/v1beta3/defaults.go index d136fd088ff..03fe748706a 100644 --- a/pkg/api/v1beta3/defaults.go +++ b/pkg/api/v1beta3/defaults.go @@ -81,8 +81,14 @@ func init() { } }, func(obj *Endpoints) { - if obj.Protocol == "" { - obj.Protocol = "TCP" + for i := range obj.Endpoints { + ep := &obj.Endpoints[i] + for j := range ep.Ports { + port := &ep.Ports[j] + if port.Protocol == "" { + port.Protocol = ProtocolTCP + } + } } }, ) diff --git a/pkg/api/v1beta3/defaults_test.go b/pkg/api/v1beta3/defaults_test.go index be2e3d5aab0..2ac72ae3360 100644 --- a/pkg/api/v1beta3/defaults_test.go +++ b/pkg/api/v1beta3/defaults_test.go @@ -104,11 +104,25 @@ func TestSetDefaultSecret(t *testing.T) { } func TestSetDefaulEndpointsProtocol(t *testing.T) { - in := ¤t.Endpoints{} + in := ¤t.Endpoints{ + Endpoints: []current.Endpoint{ + {IP: "1.2.3.4", Ports: []current.EndpointPort{ + {Protocol: "TCP"}, + {Protocol: "UDP"}, + {Protocol: ""}, + }}, + }, + } obj := roundTrip(t, runtime.Object(in)) out := obj.(*current.Endpoints) - if out.Protocol != current.ProtocolTCP { - t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol) + if out.Endpoints[0].Ports[0].Protocol != current.ProtocolTCP { + t.Errorf("Expected protocol[0] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[0].Protocol) + } + if out.Endpoints[0].Ports[1].Protocol != current.ProtocolUDP { + t.Errorf("Expected protocol[1] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[1].Protocol) + } + if out.Endpoints[0].Ports[2].Protocol != current.ProtocolTCP { + t.Errorf("Expected protocol[2] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[2].Protocol) } } diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index a09358135d7..73b2f9e7030 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -782,10 +782,6 @@ type Endpoints struct { TypeMeta `json:",inline"` ObjectMeta `json:"metadata,omitempty"` - // Optional: The IP protocol for these endpoints. Supports "TCP" and - // "UDP". Defaults to "TCP". - Protocol Protocol `json:"protocol,omitempty"` - Endpoints []Endpoint `json:"endpoints,omitempty"` } @@ -795,6 +791,20 @@ type Endpoint struct { // TODO: This should allow hostname or IP, see #4447. IP string `json:"ip"` + // The ports exposed on this IP. + Ports []EndpointPort `json:"ports,omitempty"` +} + +type EndpointPort struct { + // Optional if only one port is defined in this Endpoint, otherwise required. + // The name of this port within the larger service/endpoint structure. + // This must be a DNS_LABEL. + Name string `json:"name,omitempty"` + + // Optional: The IP protocol for this port. Supports "TCP" and "UDP". + // Defaults to "TCP". + Protocol Protocol `json:"protocol,omitempty"` + // Required: The destination port to access. Port int `json:"port"` } diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 096b887dbcf..e3dc1d213a5 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -616,7 +616,9 @@ func TestListEndpooints(t *testing.T) { { ObjectMeta: api.ObjectMeta{Name: "endpoint-1"}, Endpoints: []api.Endpoint{ - {IP: "10.245.1.2", Port: 8080}, {IP: "10.245.1.3", Port: 8080}}, + {IP: "10.245.1.2", Ports: []api.EndpointPort{{Port: 8080}}}, + {IP: "10.245.1.3", Ports: []api.EndpointPort{{Port: 8080}}}, + }, }, }, }, diff --git a/pkg/kubectl/resource_printer.go b/pkg/kubectl/resource_printer.go index 330ebe721fc..231a1cb05cb 100644 --- a/pkg/kubectl/resource_printer.go +++ b/pkg/kubectl/resource_printer.go @@ -22,10 +22,8 @@ import ( "fmt" "io" "io/ioutil" - "net" "reflect" "sort" - "strconv" "strings" "text/tabwriter" "text/template" @@ -270,10 +268,11 @@ func formatEndpoints(endpoints []api.Endpoint) string { return "" } list := []string{} - for i := range endpoints { - ep := &endpoints[i] - list = append(list, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) - } + //FIXME: What do we want to print, now that endpoints are more complex? + //for i := range endpoints { + // ep := &endpoints[i] + // list = append(list, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + //} return strings.Join(list, ",") } diff --git a/pkg/kubectl/resource_printer_test.go b/pkg/kubectl/resource_printer_test.go index cc6f7ab6b6b..fff9bf09ffd 100644 --- a/pkg/kubectl/resource_printer_test.go +++ b/pkg/kubectl/resource_printer_test.go @@ -452,7 +452,10 @@ func TestPrinters(t *testing.T) { "pod": &api.Pod{ObjectMeta: om("pod")}, "emptyPodList": &api.PodList{}, "nonEmptyPodList": &api.PodList{Items: []api.Pod{{}}}, - "endpoints": &api.Endpoints{Endpoints: []api.Endpoint{{IP: "127.0.0.1"}, {IP: "localhost", Port: 8080}}}, + "endpoints": &api.Endpoints{Endpoints: []api.Endpoint{ + {IP: "127.0.0.1"}, + {IP: "localhost", Ports: []api.EndpointPort{{Port: 8080}}}, + }}, } // map of printer name to set of objects it should fail on. expectedErrors := map[string]util.StringSet{ diff --git a/pkg/master/publish.go b/pkg/master/publish.go index 7cc73559965..ce9e1b53e76 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -128,25 +128,35 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I func (m *Master) ensureEndpointsContain(serviceName string, ip net.IP, port int) error { ctx := api.NewDefaultContext() e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName) - if err != nil || e.Protocol != api.ProtocolTCP { + if err != nil { e = &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: serviceName, Namespace: api.NamespaceDefault, }, - Protocol: api.ProtocolTCP, } } found := false +FindEndpointLoop: for i := range e.Endpoints { ep := &e.Endpoints[i] - if ep.IP == ip.String() && ep.Port == port { - found = true - break + if ep.IP == ip.String() { + for j := range ep.Ports { + epp := &ep.Ports[j] + if epp.Protocol == api.ProtocolTCP && epp.Port == port { + found = true + break FindEndpointLoop + } + } } } if !found { - e.Endpoints = append(e.Endpoints, api.Endpoint{IP: ip.String(), Port: port}) + e.Endpoints = append(e.Endpoints, api.Endpoint{ + IP: ip.String(), + Ports: []api.EndpointPort{ + {Protocol: api.ProtocolTCP, Port: port}, + }, + }) } if len(e.Endpoints) > m.masterCount { // We append to the end and remove from the beginning, so this should diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 29489f839f3..1b4430fc551 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -185,7 +185,7 @@ func TestServicesFromZeroError(t *testing.T) { func TestEndpoints(t *testing.T) { endpoint := api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}, } fakeWatch := watch.NewFake() @@ -235,7 +235,7 @@ func TestEndpoints(t *testing.T) { func TestEndpointsFromZero(t *testing.T) { endpoint := api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}, } fakeWatch := watch.NewFake() diff --git a/pkg/proxy/loadbalancer.go b/pkg/proxy/loadbalancer.go index a94665383d8..bb9f93bf269 100644 --- a/pkg/proxy/loadbalancer.go +++ b/pkg/proxy/loadbalancer.go @@ -25,8 +25,8 @@ import ( // LoadBalancer is an interface for distributing incoming requests to service endpoints. type LoadBalancer interface { // NextEndpoint returns the endpoint to handle a request for the given - // service and source address. - NextEndpoint(service string, srcAddr net.Addr) (string, error) - NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error - CleanupStaleStickySessions(service string) + // serviceName:portName and source address. + NextEndpoint(service string, port string, srcAddr net.Addr) (string, error) + NewService(service string, port string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error + CleanupStaleStickySessions(service string, port string) } diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index 245689cc134..e2b08522362 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -69,7 +69,8 @@ type tcpProxySocket struct { func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { for _, retryTimeout := range endpointDialTimeout { - endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr) + // TODO: support multiple service ports + endpoint, err := proxier.loadBalancer.NextEndpoint(service, "", srcAddr) if err != nil { glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) return nil, err @@ -383,7 +384,8 @@ func (proxier *Proxier) ensurePortals() { func (proxier *Proxier) cleanupStaleStickySessions() { for name, info := range proxier.serviceMap { if info.sessionAffinityType != api.AffinityTypeNone { - proxier.loadBalancer.CleanupStaleStickySessions(name) + // TODO: support multiple service ports + proxier.loadBalancer.CleanupStaleStickySessions(name, "") } } } @@ -499,7 +501,8 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { if err != nil { glog.Errorf("Failed to open portal for %q: %v", service.Name, err) } - proxier.loadBalancer.NewService(service.Name, info.sessionAffinityType, info.stickyMaxAgeMinutes) + // TODO: support multiple service ports + proxier.loadBalancer.NewService(service.Name, "", info.sessionAffinityType, info.stickyMaxAgeMinutes) } proxier.mu.Lock() defer proxier.mu.Unlock() diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 5215bb546eb..71c52b43924 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -197,7 +197,7 @@ func TestTCPProxy(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, }, }) @@ -217,7 +217,7 @@ func TestUDPProxy(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, }, }) @@ -246,7 +246,7 @@ func TestTCPProxyStop(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, }, }) @@ -277,7 +277,7 @@ func TestUDPProxyStop(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, }, }) @@ -308,7 +308,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, }, }) @@ -338,7 +338,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, }, }) @@ -368,7 +368,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, }, }) @@ -407,7 +407,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, }, }) @@ -446,7 +446,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, }, }) @@ -482,7 +482,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, }, }) diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index 9af7cc990e2..384025774ef 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -49,18 +49,26 @@ type affinityPolicy struct { ttlMinutes int } -// balancerKey is a string that the balancer uses to key stored state. +// balancerKey is a string that the balancer uses to key stored state. It is +// formatted as "service_name:port_name", but that should be opaque to most consumers. type balancerKey string +func makeBalancerKey(service, port string) balancerKey { + return balancerKey(fmt.Sprintf("%s:%s", service, port)) +} + // LoadBalancerRR is a round-robin load balancer. type LoadBalancerRR struct { lock sync.RWMutex services map[balancerKey]*balancerState } +// Ensure this implements LoadBalancer. +var _ LoadBalancer = &LoadBalancerRR{} + type balancerState struct { - endpoints []string - index int + endpoints []string // a list of "ip:port" style strings + index int // index into endpoints affinity affinityPolicy } @@ -79,20 +87,20 @@ func NewLoadBalancerRR() *LoadBalancerRR { } } -func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error { +func (lb *LoadBalancerRR) NewService(service, port string, affinityType api.AffinityType, ttlMinutes int) error { lb.lock.Lock() defer lb.lock.Unlock() - lb.newServiceInternal(service, affinityType, ttlMinutes) + lb.newServiceInternal(service, port, affinityType, ttlMinutes) return nil } -func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) *balancerState { +func (lb *LoadBalancerRR) newServiceInternal(service, port string, affinityType api.AffinityType, ttlMinutes int) *balancerState { if ttlMinutes == 0 { ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? } - key := balancerKey(service) + key := makeBalancerKey(service, port) if _, exists := lb.services[key]; !exists { lb.services[key] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)} glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service) @@ -111,13 +119,13 @@ func isSessionAffinity(affinity *affinityPolicy) bool { // NextEndpoint returns a service endpoint. // The service endpoint is chosen using the round-robin algorithm. -func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) { +func (lb *LoadBalancerRR) NextEndpoint(service, port string, srcAddr net.Addr) (string, error) { // Coarse locking is simple. We can get more fine-grained if/when we // can prove it matters. lb.lock.Lock() defer lb.lock.Unlock() - key := balancerKey(service) + key := makeBalancerKey(service, port) state, exists := lb.services[key] if !exists || state == nil { return "", ErrMissingServiceEntry @@ -166,18 +174,22 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string return endpoint, nil } -func isValidEndpoint(ep *api.Endpoint) bool { - return ep.IP != "" && ep.Port > 0 +type hostPortPair struct { + host string + port int } -func filterValidEndpoints(endpoints []api.Endpoint) []string { - // Convert Endpoint objects into strings for easier use later. Ignore - // the protocol field - we'll get that from the Service objects. +func isValidEndpoint(hpp *hostPortPair) bool { + return hpp.host != "" && hpp.port > 0 +} + +func getValidEndpoints(pairs []hostPortPair) []string { + // Convert structs into strings for easier use later. var result []string - for i := range endpoints { - ep := &endpoints[i] - if isValidEndpoint(ep) { - result = append(result, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + for i := range pairs { + hpp := &pairs[i] + if isValidEndpoint(hpp) { + result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))) } } return result @@ -225,27 +237,45 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { defer lb.lock.Unlock() // Update endpoints for services. - for _, svcEndpoints := range allEndpoints { - key := balancerKey(svcEndpoints.Name) - state, exists := lb.services[key] - curEndpoints := []string{} - if state != nil { - curEndpoints = state.endpoints - } - newEndpoints := filterValidEndpoints(svcEndpoints.Endpoints) - if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { - glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints) - lb.updateAffinityMap(key, newEndpoints) - // On update can be called without NewService being called externally. - // To be safe we will call it here. A new service will only be created - // if one does not already exist. - state = lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0) - state.endpoints = slice.ShuffleStrings(newEndpoints) + for i := range allEndpoints { + svcEndpoints := &allEndpoints[i] - // Reset the round-robin index. - state.index = 0 + // We need to build a map of portname -> all ip:ports for that portname. + portsToEndpoints := map[string][]hostPortPair{} + + // Explode the Endpoints.Endpoints[*].Ports[*] into the aforementioned map. + // FIXME: this is awkward. Maybe a different factoring of Endpoints is better? + for j := range svcEndpoints.Endpoints { + ep := &svcEndpoints.Endpoints[j] + for k := range ep.Ports { + epp := &ep.Ports[k] + portsToEndpoints[epp.Name] = append(portsToEndpoints[epp.Name], hostPortPair{ep.IP, epp.Port}) + // Ignore the protocol field - we'll get that from the Service objects. + } + } + + for portname := range portsToEndpoints { + key := makeBalancerKey(svcEndpoints.Name, portname) + state, exists := lb.services[key] + curEndpoints := []string{} + if state != nil { + curEndpoints = state.endpoints + } + newEndpoints := getValidEndpoints(portsToEndpoints[portname]) + if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { + glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints) + lb.updateAffinityMap(key, newEndpoints) + // On update can be called without NewService being called externally. + // To be safe we will call it here. A new service will only be created + // if one does not already exist. + state = lb.newServiceInternal(svcEndpoints.Name, portname, api.AffinityTypeNone, 0) + state.endpoints = slice.ShuffleStrings(newEndpoints) + + // Reset the round-robin index. + state.index = 0 + } + registeredEndpoints[key] = true } - registeredEndpoints[key] = true } // Remove endpoints missing from the update. for k := range lb.services { @@ -267,11 +297,11 @@ func slicesEquiv(lhs, rhs []string) bool { return false } -func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { +func (lb *LoadBalancerRR) CleanupStaleStickySessions(service, port string) { lb.lock.Lock() defer lb.lock.Unlock() - key := balancerKey(service) + key := makeBalancerKey(service, port) state, exists := lb.services[key] if !exists { glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service) diff --git a/pkg/proxy/roundrobin_test.go b/pkg/proxy/roundrobin_test.go index c47e07eab06..0861c5ec72e 100644 --- a/pkg/proxy/roundrobin_test.go +++ b/pkg/proxy/roundrobin_test.go @@ -24,29 +24,29 @@ import ( ) func TestValidateWorks(t *testing.T) { - if isValidEndpoint(&api.Endpoint{}) { + if isValidEndpoint(&hostPortPair{}) { t.Errorf("Didn't fail for empty string") } - if isValidEndpoint(&api.Endpoint{IP: "foobar"}) { + if isValidEndpoint(&hostPortPair{host: "foobar"}) { t.Errorf("Didn't fail with no port") } - if isValidEndpoint(&api.Endpoint{IP: "foobar", Port: -1}) { + if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) { t.Errorf("Didn't fail with a negative port") } - if !isValidEndpoint(&api.Endpoint{IP: "foobar", Port: 8080}) { + if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) { t.Errorf("Failed a valid config.") } } func TestFilterWorks(t *testing.T) { - endpoints := []api.Endpoint{ - {IP: "foobar", Port: 1}, - {IP: "foobar", Port: 2}, - {IP: "foobar", Port: -1}, - {IP: "foobar", Port: 3}, - {IP: "foobar", Port: -2}, + endpoints := []hostPortPair{ + {host: "foobar", port: 1}, + {host: "foobar", port: 2}, + {host: "foobar", port: -1}, + {host: "foobar", port: 3}, + {host: "foobar", port: -2}, } - filtered := filterValidEndpoints(endpoints) + filtered := getValidEndpoints(endpoints) if len(filtered) != 3 { t.Errorf("Failed to filter to the correct size") @@ -66,7 +66,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() var endpoints []api.Endpoints loadBalancer.OnUpdate(endpoints) - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "http", nil) if err == nil { t.Errorf("Didn't fail with non-existent service") } @@ -75,8 +75,8 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { } } -func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, expected string, netaddr net.Addr) { - endpoint, err := loadBalancer.NextEndpoint(service, netaddr) +func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, port string, expected string, netaddr net.Addr) { + endpoint, err := loadBalancer.NextEndpoint(service, port, netaddr) if err != nil { t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err) } @@ -87,25 +87,41 @@ func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []api.Endpoint{{IP: "endpoint1", Port: 40}}, + Endpoints: []api.Endpoint{{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 40}}}}, } loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) - expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil) +} + +func stringsInSlice(haystack []string, needles ...string) bool { + for _, needle := range needles { + found := false + for i := range haystack { + if haystack[i] == needle { + found = true + break + } + } + if found == false { + return false + } + } + return true } func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -113,22 +129,77 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) + + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) +} + +func TestLoadBalanceWorksWithMultipleEndpointsAndPorts(t *testing.T) { + loadBalancer := NewLoadBalancerRR() + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 1}, + {Name: "q", Port: 10}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 2}, + {Name: "q", Port: 20}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 3}, + {Name: "q", Port: 30}, + }}, + }, + } + loadBalancer.OnUpdate(endpoints) + + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) + + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil) } func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -136,37 +207,86 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 1}, + {Name: "q", Port: 10}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 2}, + {Name: "q", Port: 20}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 3}, + {Name: "q", Port: 30}, + }}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) + + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) + // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 8}, - {IP: "endpoint", Port: 9}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 8}, + {Name: "q", Port: 80}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 9}, + {Name: "q", Port: 90}, + }}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) + + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:8", "endpoint:9") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) + + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:80", "endpoint:90") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) + // Clear endpoints endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}} loadBalancer.OnUpdate(endpoints) - endpoint, err = loadBalancer.NextEndpoint("foo", nil) + endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + endpoint, err = loadBalancer.NextEndpoint("foo", "q", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -174,7 +294,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -182,133 +302,183 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 1}, + {Name: "q", Port: 10}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 2}, + {Name: "q", Port: 20}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 3}, + {Name: "q", Port: 30}, + }}, }, } endpoints[1] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 4}, - {IP: "endpoint", Port: 5}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 4}, + {Name: "q", Port: 40}, + }}, + {IP: "endpoint", Ports: []api.EndpointPort{ + {Name: "p", Port: 5}, + {Name: "q", Port: 50}, + }}, }, } loadBalancer.OnUpdate(endpoints) - shuffledFooEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) - shuffledBarEndpoints := loadBalancer.services["bar"].endpoints - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) + shuffledFooEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledFooEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledFooEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], nil) + + shuffledFooEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints + if !stringsInSlice(shuffledFooEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") { + t.Errorf("did not find expected endpoints: %v", shuffledFooEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[1], nil) + + shuffledBarEndpoints := loadBalancer.services[makeBalancerKey("bar", "p")].endpoints + if !stringsInSlice(shuffledBarEndpoints, "endpoint:4", "endpoint:5") { + t.Errorf("did not find expected endpoints: %v", shuffledBarEndpoints) + } + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil) + + shuffledBarEndpoints = loadBalancer.services[makeBalancerKey("bar", "q")].endpoints + if !stringsInSlice(shuffledBarEndpoints, "endpoint:40", "endpoint:50") { + t.Errorf("did not find expected endpoints: %v", shuffledBarEndpoints) + } + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) // Then update the configuration by removing foo loadBalancer.OnUpdate(endpoints[1:]) - endpoint, err = loadBalancer.NextEndpoint("foo", nil) + endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } // but bar is still there, and we continue RR from where we left off. - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) } func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) { client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []api.Endpoint{{IP: "endpoint", Port: 1}}, - } - loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2) - expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2) -} - -func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { - client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} - client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} - client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} - loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) - if err == nil || len(endpoint) != 0 { - t.Errorf("Didn't fail with non-existent service") - } - - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client2) + expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client2) } -func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { +func TestStickyLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeNone, 0) + loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) } -func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { +func TestStickyLoadBalanceWorksWithMultipleEndpointsStickyNone(t *testing.T) { + client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} + client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} + client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} + loadBalancer := NewLoadBalancerRR() + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + if err == nil || len(endpoint) != 0 { + t.Errorf("Didn't fail with non-existent service") + } + + loadBalancer.NewService("foo", "p", api.AffinityTypeNone, 0) + endpoints := make([]api.Endpoints, 1) + endpoints[0] = api.Endpoints{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Endpoints: []api.Endpoint{ + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, + }, + } + loadBalancer.OnUpdate(endpoints) + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client1) +} + +func TestStickyLoadBalanceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} @@ -316,41 +486,44 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0} client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) client1Endpoint := shuffledEndpoints[0] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) client2Endpoint := shuffledEndpoints[1] - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) client3Endpoint := shuffledEndpoints[2] endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.services["foo"].endpoints + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints if client1Endpoint == "endpoint:3" { client1Endpoint = shuffledEndpoints[0] } else if client2Endpoint == "endpoint:3" { @@ -358,26 +531,26 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { } else if client3Endpoint == "endpoint:3" { client3Endpoint = shuffledEndpoints[0] } - expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) - expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) - expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) + expectEndpoint(t, loadBalancer, "foo", "p", client1Endpoint, client1) + expectEndpoint(t, loadBalancer, "foo", "p", client2Endpoint, client2) + expectEndpoint(t, loadBalancer, "foo", "p", client3Endpoint, client3) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 4}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 4}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) - expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) - expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client4) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client5) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client6) + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + expectEndpoint(t, loadBalancer, "foo", "p", client1Endpoint, client1) + expectEndpoint(t, loadBalancer, "foo", "p", client2Endpoint, client2) + expectEndpoint(t, loadBalancer, "foo", "p", client3Endpoint, client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client4) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client5) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client6) } func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { @@ -385,51 +558,54 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { + t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) + } + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 4}, - {IP: "endpoint", Port: 5}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 4}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 5}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) // Clear endpoints endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}} loadBalancer.OnUpdate(endpoints) - endpoint, err = loadBalancer.NextEndpoint("foo", nil) + endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -440,58 +616,58 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 2) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 1}, - {IP: "endpoint", Port: 2}, - {IP: "endpoint", Port: 3}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, }, } - loadBalancer.NewService("bar", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("bar", "p", api.AffinityTypeClientIP, 0) endpoints[1] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Port: 5}, - {IP: "endpoint", Port: 5}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 5}}}, + {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 6}}}, }, } loadBalancer.OnUpdate(endpoints) - shuffledFooEndpoints := loadBalancer.services["foo"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) + shuffledFooEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2) - shuffledBarEndpoints := loadBalancer.services["bar"].endpoints - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) + shuffledBarEndpoints := loadBalancer.services[makeBalancerKey("bar", "p")].endpoints + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) // Then update the configuration by removing foo loadBalancer.OnUpdate(endpoints[1:]) - endpoint, err = loadBalancer.NextEndpoint("foo", nil) + endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } // but bar is still there, and we continue RR from where we left off. - shuffledBarEndpoints = loadBalancer.services["bar"].endpoints - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) + shuffledBarEndpoints = loadBalancer.services[makeBalancerKey("bar", "p")].endpoints + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1) } diff --git a/pkg/registry/endpoint/rest_test.go b/pkg/registry/endpoint/rest_test.go index 99e00798844..7deea343ae2 100644 --- a/pkg/registry/endpoint/rest_test.go +++ b/pkg/registry/endpoint/rest_test.go @@ -27,10 +27,20 @@ import ( ) func TestGetEndpoints(t *testing.T) { + expected := []api.Endpoint{ + {IP: "127.0.0.1", Ports: []api.EndpointPort{ + {Name: "p", Port: 9000, Protocol: api.ProtocolTCP}, + {Name: "q", Port: 9000, Protocol: api.ProtocolUDP}, + }}, + {IP: "127.0.0.2", Ports: []api.EndpointPort{ + {Name: "p", Port: 8000, Protocol: api.ProtocolTCP}, + {Name: "q", Port: 8000, Protocol: api.ProtocolUDP}, + }}, + } registry := ®istrytest.ServiceRegistry{ Endpoints: api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, + Endpoints: expected, }, } storage := NewREST(registry) @@ -39,7 +49,7 @@ func TestGetEndpoints(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %#v", err) } - if !reflect.DeepEqual([]api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, obj.(*api.Endpoints).Endpoints) { + if !reflect.DeepEqual(expected, obj.(*api.Endpoints).Endpoints) { t.Errorf("unexpected endpoints: %#v", obj) } } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 2384fb76a3f..cc6d77cbc0e 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -24,6 +24,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" @@ -598,10 +600,10 @@ func TestEtcdListEndpoints(t *testing.T) { Node: &etcd.Node{ Nodes: []*etcd.Node{ { - Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP", Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 8345}}}), + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Name: "p", Port: 8345, Protocol: api.ProtocolTCP}}}}}), }, { - Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}, Protocol: "TCP"}), + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}}), }, }, }, @@ -625,8 +627,7 @@ func TestEtcdGetEndpoints(t *testing.T) { registry := NewTestEtcdRegistry(fakeClient) endpoints := &api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Protocol: "TCP", - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 34855}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 34855, Protocol: api.ProtocolTCP}}}}, } key, _ := makeServiceEndpointsKey(ctx, "foo") @@ -647,10 +648,19 @@ func TestEtcdUpdateEndpoints(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true registry := NewTestEtcdRegistry(fakeClient) + + // TODO: Once we drop single-port APIs, make this test use the + // multi-port features. This will force a compile error when those APIs + // are deleted. + _ = v1beta1.Dependency + _ = v1beta2.Dependency + endpoints := api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Protocol: "TCP", - Endpoints: []api.Endpoint{{IP: "baz"}, {IP: "bar"}}, + Endpoints: []api.Endpoint{ + {IP: "baz", Ports: []api.EndpointPort{{Port: 1, Protocol: api.ProtocolTCP}}}, + {IP: "bar", Ports: []api.EndpointPort{{Port: 2, Protocol: api.ProtocolTCP}}}, + }, } key, _ := makeServiceEndpointsKey(ctx, "foo") diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index c3690f61431..b000063e7a2 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -21,6 +21,7 @@ import ( "math/rand" "net" "strconv" + "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -218,17 +219,57 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo // ResourceLocation returns a URL to which one can send traffic for the specified service. func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) { - eps, err := rs.registry.GetEndpoints(ctx, id) + // Allow ID as "svcname" or "svcname:port". Choose an endpoint at + // random. If the port is specified as a number, use that value + // directly. If the port is specified as a name, try to look up that + // name on the chosen endpoint. If port is not specified, try to use + // the first unnamed port on the chosen endpoint. If there are no + // unnamed ports, try to use the first defined port. + parts := strings.Split(id, ":") + if len(parts) > 2 { + return "", errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id)) + } + name := parts[0] + port := "" + if len(parts) == 2 { + port = parts[1] + } + + eps, err := rs.registry.GetEndpoints(ctx, name) if err != nil { return "", err } if len(eps.Endpoints) == 0 { - return "", fmt.Errorf("no endpoints available for %v", id) + return "", fmt.Errorf("no endpoints available for %v", name) } + ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))] + + // Try to figure out a port. + if _, err := strconv.Atoi(port); err != nil { + // Do nothing - port is correct as is. + } else { + // Try a name lookup, even if name is "". + for i := range ep.Ports { + if ep.Ports[i].Name == port { + port = strconv.Itoa(ep.Ports[i].Port) + break + } + } + } + if port == "" { + // Still nothing - try the first defined port. + if len(ep.Ports) > 0 { + port = strconv.Itoa(ep.Ports[0].Port) + } + } + // We leave off the scheme ('http://') because we have no idea what sort of server // is listening at this endpoint. - ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))] - return net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)), nil + loc := ep.IP + if port != "" { + loc += fmt.Sprintf(":%s", port) + } + return loc, nil } func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error { diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index 8426ef4134a..e001abbb23f 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -370,7 +370,7 @@ func TestServiceRegistryGet(t *testing.T) { func TestServiceRegistryResourceLocation(t *testing.T) { ctx := api.NewDefaultContext() registry := registrytest.NewServiceRegistry() - registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Port: 80}}} + registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Ports: []api.EndpointPort{{Port: 80}}}}} fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 3f42a9d0d8b..870880db3be 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -87,7 +87,11 @@ func (e *EndpointController) SyncServiceEndpoints() error { continue } - endpoints = append(endpoints, api.Endpoint{IP: pod.Status.PodIP, Port: port}) + // TODO: Add multiple-ports to Service and expose them here. + endpoints = append(endpoints, api.Endpoint{ + IP: pod.Status.PodIP, + Ports: []api.EndpointPort{{Name: "", Protocol: service.Spec.Protocol, Port: port}}, + }) } currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) if err != nil { @@ -96,7 +100,6 @@ func (e *EndpointController) SyncServiceEndpoints() error { ObjectMeta: api.ObjectMeta{ Name: service.Name, }, - Protocol: service.Spec.Protocol, } } else { glog.Errorf("Error getting endpoints: %v", err) @@ -112,7 +115,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) } else { // Pre-existing - if currentEndpoints.Protocol == service.Spec.Protocol && endpointsEqual(currentEndpoints, endpoints) { + if endpointsEqual(currentEndpoints, endpoints) { glog.V(5).Infof("protocol and endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) continue } @@ -126,12 +129,27 @@ func (e *EndpointController) SyncServiceEndpoints() error { return resultErr } +// TODO: It would be nice if we had a util function that reflectively compared +// two slices for order-insensitive equivalence. +func portsEqual(lhs, rhs []api.EndpointPort) bool { + if len(lhs) != len(rhs) { + return false + } + for i := range lhs { + if lhs[i] != rhs[i] { + return false + } + } + return true +} + func containsEndpoint(haystack *api.Endpoints, needle *api.Endpoint) bool { if haystack == nil || needle == nil { return false } for ix := range haystack.Endpoints { - if haystack.Endpoints[ix] == *needle { + haystackEP := &haystack.Endpoints[ix] + if haystackEP.IP == needle.IP && portsEqual(haystackEP.Ports, needle.Ports) { return true } } diff --git a/pkg/service/endpoints_controller_test.go b/pkg/service/endpoints_controller_test.go index 40c20494c4c..72336b53d88 100644 --- a/pkg/service/endpoints_controller_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -245,8 +245,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -277,8 +276,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -309,8 +307,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolUDP, - Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolUDP, Port: 1000}}}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -340,7 +337,6 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, Endpoints: []api.Endpoint{}, }}) defer testServer.Close() @@ -354,8 +350,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=other", "PUT", &data) } @@ -381,8 +376,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -395,8 +389,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=bar", "PUT", &data) } @@ -421,8 +414,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { ObjectMeta: api.ObjectMeta{ ResourceVersion: "1", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -460,8 +452,7 @@ func TestSyncEndpointsItems(t *testing.T) { ObjectMeta: api.ObjectMeta{ ResourceVersion: "", }, - Protocol: api.ProtocolTCP, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints?namespace=other", "POST", &data) } diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index 9685898d6bb..f9f09587836 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -308,7 +308,7 @@ func TestWatchEtcdState(t *testing.T) { { Action: "compareAndSwap", Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})), + Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}})), CreatedIndex: 1, ModifiedIndex: 2, }, @@ -321,7 +321,7 @@ func TestWatchEtcdState(t *testing.T) { }, From: 1, Expected: []*T{ - {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}}, + {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}}, }, }, "from initial state": { @@ -343,7 +343,7 @@ func TestWatchEtcdState(t *testing.T) { { Action: "compareAndSwap", Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})), + Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}})), CreatedIndex: 1, ModifiedIndex: 2, }, @@ -356,7 +356,7 @@ func TestWatchEtcdState(t *testing.T) { }, Expected: []*T{ {watch.Added, nil}, - {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}}, + {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}}, }, }, } diff --git a/test/e2e/service.go b/test/e2e/service.go index 9a9408fca8c..878174c0615 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -250,8 +250,11 @@ var _ = Describe("Services", func() { func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEndpoints []string, endpoints *api.Endpoints) { ips := util.StringSet{} for _, ep := range endpoints.Endpoints { - if ep.Port != expectedPort { - Fail(fmt.Sprintf("invalid port, expected %d, got %d", expectedPort, ep.Port)) + if len(ep.Ports) == 0 { + Fail(fmt.Sprintf("invalid endpoint, no ports")) + } + if ep.Ports[0].Port != expectedPort { + Fail(fmt.Sprintf("invalid port, expected %d, got %d", expectedPort, ep.Ports[0].Port)) } ips.Insert(ep.IP) }