Implement multi-port Endpoints

This is a part of multi-port services.
This commit is contained in:
Tim Hockin 2015-02-21 01:05:18 -08:00
parent e0fd83096c
commit 160f288832
33 changed files with 755 additions and 387 deletions

View File

@ -24,6 +24,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -237,7 +239,13 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer {
func(ep *api.Endpoint, c fuzz.Continue) {
// TODO: If our API used a particular type for IP fields we could just catch that here.
ep.IP = fmt.Sprintf("%d.%d.%d.%d", c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256))
ep.Port = c.Rand.Intn(65536)
// TODO: Once we drop single-port APIs, make this fuzz
// multiple ports and fuzz port.name. This will force
// a compile error when those APIs are deleted.
_ = v1beta1.Dependency
_ = v1beta2.Dependency
ep.Ports = []api.EndpointPort{{Name: "", Port: c.Rand.Intn(65536)}}
c.Fuzz(&ep.Ports[0].Protocol)
},
)
return f

View File

@ -750,9 +750,6 @@ type Endpoints struct {
TypeMeta `json:",inline"`
ObjectMeta `json:"metadata,omitempty"`
// Optional: The IP protocol for these endpoints. Supports "TCP" and
// "UDP". Defaults to "TCP".
Protocol Protocol `json:"protocol,omitempty"`
Endpoints []Endpoint `json:"endpoints,omitempty"`
}
@ -762,8 +759,21 @@ type Endpoint struct {
// TODO: This should allow hostname or IP, see #4447.
IP string `json:"ip"`
// Required: The destination port to access.
Port int `json:"port"`
// The ports exposed on this IP.
Ports []EndpointPort
}
type EndpointPort struct {
// Optional if only one port is defined in this Endpoint.
// The name of this port within the larger service/endpoint structure.
// This must be a DNS_LABEL.
Name string
// The IP protocol for this port. Supports "TCP" and "UDP".
Protocol Protocol
// The destination port to access.
Port int
}
// EndpointsList is a list of endpoints.

View File

@ -1121,12 +1121,16 @@ func init() {
if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil {
return err
}
for i := range in.Endpoints {
ep := &in.Endpoints[i]
out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
// newer.Endpoints.Endpoints[i].Ports is an array - take the first one.
if len(ep.Ports) > 0 {
port := &ep.Ports[0]
if err := s.Convert(&port.Protocol, &out.Protocol, 0); err != nil {
return err
}
out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(port.Port)))
}
}
return nil
},
@ -1137,22 +1141,20 @@ func init() {
if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil {
return err
}
for i := range in.Endpoints {
out.Endpoints = append(out.Endpoints, newer.Endpoint{})
ep := &out.Endpoints[i]
host, port, err := net.SplitHostPort(in.Endpoints[i])
if err != nil {
return err
}
ep.IP = host
pn, err := strconv.Atoi(port)
if err != nil {
return err
}
ep.Port = pn
epp := newer.EndpointPort{Port: pn}
if err := s.Convert(&in.Protocol, &epp.Protocol, 0); err != nil {
return err
}
out.Endpoints = append(out.Endpoints, newer.Endpoint{IP: host, Ports: []newer.EndpointPort{epp}})
}
return nil
},

View File

@ -390,41 +390,35 @@ func TestEndpointsConversion(t *testing.T) {
}{
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "empty",
},
Protocol: current.ProtocolTCP,
Protocol: "",
Endpoints: []string{},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolTCP,
Endpoints: []newer.Endpoint{},
},
},
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "one",
},
Protocol: current.ProtocolTCP,
Endpoints: []string{"1.2.3.4:88"},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolTCP,
Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}},
Endpoints: []newer.Endpoint{
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolTCP, Port: 88}}},
},
},
},
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "several",
},
Protocol: current.ProtocolUDP,
Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolUDP,
Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}},
Endpoints: []newer.Endpoint{
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 88}}},
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 89}}},
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 90}}},
},
},
},
}
@ -436,7 +430,7 @@ func TestEndpointsConversion(t *testing.T) {
t.Errorf("[Case: %d] Unexpected error: %v", i, err)
continue
}
if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) {
if !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) {
t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got)
}

View File

@ -86,7 +86,7 @@ func init() {
}
},
func(obj *Endpoints) {
if obj.Protocol == "" {
if obj.Protocol == "" && len(obj.Endpoints) > 0 {
obj.Protocol = "TCP"
}
},

View File

@ -103,11 +103,21 @@ func TestSetDefaultSecret(t *testing.T) {
}
}
func TestSetDefaulEndpointsProtocol(t *testing.T) {
func TestSetDefaulEndpointsProtocolEmpty(t *testing.T) {
in := &current.Endpoints{}
obj := roundTrip(t, runtime.Object(in))
out := obj.(*current.Endpoints)
if out.Protocol != "" {
t.Errorf("Expected protocol \"\", got %s", out.Protocol)
}
}
func TestSetDefaulEndpointsProtocol(t *testing.T) {
in := &current.Endpoints{Endpoints: []string{"1.2.3.4:5678"}}
obj := roundTrip(t, runtime.Object(in))
out := obj.(*current.Endpoints)
if out.Protocol != current.ProtocolTCP {
t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol)
}

View File

@ -24,6 +24,12 @@ import (
// Codec encodes internal objects to the v1beta1 scheme
var Codec = runtime.CodecFor(api.Scheme, "v1beta1")
// Dependency does nothing but give a hook for other packages to force a
// compile-time error when this API version is eventually removed. This is
// useful, for example, to clean up things that are implicitly tied to
// semantics of older APIs.
const Dependency = true
func init() {
api.Scheme.AddKnownTypes("v1beta1",
&Pod{},

View File

@ -1036,12 +1036,16 @@ func init() {
if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil {
return err
}
for i := range in.Endpoints {
ep := &in.Endpoints[i]
out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
// newer.Endpoints.Endpoints[i].Ports is an array - take the first one.
if len(ep.Ports) > 0 {
port := &ep.Ports[0]
if err := s.Convert(&port.Protocol, &out.Protocol, 0); err != nil {
return err
}
out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(port.Port)))
}
}
return nil
},
@ -1052,22 +1056,20 @@ func init() {
if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil {
return err
}
for i := range in.Endpoints {
out.Endpoints = append(out.Endpoints, newer.Endpoint{})
ep := &out.Endpoints[i]
host, port, err := net.SplitHostPort(in.Endpoints[i])
if err != nil {
return err
}
ep.IP = host
pn, err := strconv.Atoi(port)
if err != nil {
return err
}
ep.Port = pn
epp := newer.EndpointPort{Port: pn}
if err := s.Convert(&in.Protocol, &epp.Protocol, 0); err != nil {
return err
}
out.Endpoints = append(out.Endpoints, newer.Endpoint{IP: host, Ports: []newer.EndpointPort{epp}})
}
return nil
},

View File

@ -220,41 +220,35 @@ func TestEndpointsConversion(t *testing.T) {
}{
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "empty",
},
Protocol: current.ProtocolTCP,
Protocol: "",
Endpoints: []string{},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolTCP,
Endpoints: []newer.Endpoint{},
},
},
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "one",
},
Protocol: current.ProtocolTCP,
Endpoints: []string{"1.2.3.4:88"},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolTCP,
Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}},
Endpoints: []newer.Endpoint{
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolTCP, Port: 88}}},
},
},
},
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "several",
},
Protocol: current.ProtocolUDP,
Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolUDP,
Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}},
Endpoints: []newer.Endpoint{
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 88}}},
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 89}}},
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 90}}},
},
},
},
}
@ -266,7 +260,7 @@ func TestEndpointsConversion(t *testing.T) {
t.Errorf("[Case: %d] Unexpected error: %v", i, err)
continue
}
if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) {
if !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) {
t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got)
}

View File

@ -88,7 +88,7 @@ func init() {
}
},
func(obj *Endpoints) {
if obj.Protocol == "" {
if obj.Protocol == "" && len(obj.Endpoints) > 0 {
obj.Protocol = "TCP"
}
},

View File

@ -103,11 +103,21 @@ func TestSetDefaultSecret(t *testing.T) {
}
}
func TestSetDefaulEndpointsProtocol(t *testing.T) {
func TestSetDefaulEndpointsProtocolEmpty(t *testing.T) {
in := &current.Endpoints{}
obj := roundTrip(t, runtime.Object(in))
out := obj.(*current.Endpoints)
if out.Protocol != "" {
t.Errorf("Expected protocol \"\", got %s", out.Protocol)
}
}
func TestSetDefaulEndpointsProtocol(t *testing.T) {
in := &current.Endpoints{Endpoints: []string{"1.2.3.4:5678"}}
obj := roundTrip(t, runtime.Object(in))
out := obj.(*current.Endpoints)
if out.Protocol != current.ProtocolTCP {
t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol)
}

View File

@ -24,6 +24,12 @@ import (
// Codec encodes internal objects to the v1beta2 scheme
var Codec = runtime.CodecFor(api.Scheme, "v1beta2")
// Dependency does nothing but give a hook for other packages to force a
// compile-time error when this API version is eventually removed. This is
// useful, for example, to clean up things that are implicitly tied to
// semantics of older APIs.
const Dependency = true
func init() {
api.Scheme.AddKnownTypes("v1beta2",
&Pod{},

View File

@ -81,8 +81,14 @@ func init() {
}
},
func(obj *Endpoints) {
if obj.Protocol == "" {
obj.Protocol = "TCP"
for i := range obj.Endpoints {
ep := &obj.Endpoints[i]
for j := range ep.Ports {
port := &ep.Ports[j]
if port.Protocol == "" {
port.Protocol = ProtocolTCP
}
}
}
},
)

View File

@ -104,11 +104,25 @@ func TestSetDefaultSecret(t *testing.T) {
}
func TestSetDefaulEndpointsProtocol(t *testing.T) {
in := &current.Endpoints{}
in := &current.Endpoints{
Endpoints: []current.Endpoint{
{IP: "1.2.3.4", Ports: []current.EndpointPort{
{Protocol: "TCP"},
{Protocol: "UDP"},
{Protocol: ""},
}},
},
}
obj := roundTrip(t, runtime.Object(in))
out := obj.(*current.Endpoints)
if out.Protocol != current.ProtocolTCP {
t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol)
if out.Endpoints[0].Ports[0].Protocol != current.ProtocolTCP {
t.Errorf("Expected protocol[0] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[0].Protocol)
}
if out.Endpoints[0].Ports[1].Protocol != current.ProtocolUDP {
t.Errorf("Expected protocol[1] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[1].Protocol)
}
if out.Endpoints[0].Ports[2].Protocol != current.ProtocolTCP {
t.Errorf("Expected protocol[2] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[2].Protocol)
}
}

View File

@ -782,10 +782,6 @@ type Endpoints struct {
TypeMeta `json:",inline"`
ObjectMeta `json:"metadata,omitempty"`
// Optional: The IP protocol for these endpoints. Supports "TCP" and
// "UDP". Defaults to "TCP".
Protocol Protocol `json:"protocol,omitempty"`
Endpoints []Endpoint `json:"endpoints,omitempty"`
}
@ -795,6 +791,20 @@ type Endpoint struct {
// TODO: This should allow hostname or IP, see #4447.
IP string `json:"ip"`
// The ports exposed on this IP.
Ports []EndpointPort `json:"ports,omitempty"`
}
type EndpointPort struct {
// Optional if only one port is defined in this Endpoint, otherwise required.
// The name of this port within the larger service/endpoint structure.
// This must be a DNS_LABEL.
Name string `json:"name,omitempty"`
// Optional: The IP protocol for this port. Supports "TCP" and "UDP".
// Defaults to "TCP".
Protocol Protocol `json:"protocol,omitempty"`
// Required: The destination port to access.
Port int `json:"port"`
}

View File

@ -616,7 +616,9 @@ func TestListEndpooints(t *testing.T) {
{
ObjectMeta: api.ObjectMeta{Name: "endpoint-1"},
Endpoints: []api.Endpoint{
{IP: "10.245.1.2", Port: 8080}, {IP: "10.245.1.3", Port: 8080}},
{IP: "10.245.1.2", Ports: []api.EndpointPort{{Port: 8080}}},
{IP: "10.245.1.3", Ports: []api.EndpointPort{{Port: 8080}}},
},
},
},
},

View File

@ -22,10 +22,8 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"reflect"
"sort"
"strconv"
"strings"
"text/tabwriter"
"text/template"
@ -270,10 +268,11 @@ func formatEndpoints(endpoints []api.Endpoint) string {
return "<empty>"
}
list := []string{}
for i := range endpoints {
ep := &endpoints[i]
list = append(list, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
}
//FIXME: What do we want to print, now that endpoints are more complex?
//for i := range endpoints {
// ep := &endpoints[i]
// list = append(list, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
//}
return strings.Join(list, ",")
}

View File

@ -452,7 +452,10 @@ func TestPrinters(t *testing.T) {
"pod": &api.Pod{ObjectMeta: om("pod")},
"emptyPodList": &api.PodList{},
"nonEmptyPodList": &api.PodList{Items: []api.Pod{{}}},
"endpoints": &api.Endpoints{Endpoints: []api.Endpoint{{IP: "127.0.0.1"}, {IP: "localhost", Port: 8080}}},
"endpoints": &api.Endpoints{Endpoints: []api.Endpoint{
{IP: "127.0.0.1"},
{IP: "localhost", Ports: []api.EndpointPort{{Port: 8080}}},
}},
}
// map of printer name to set of objects it should fail on.
expectedErrors := map[string]util.StringSet{

View File

@ -128,25 +128,35 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I
func (m *Master) ensureEndpointsContain(serviceName string, ip net.IP, port int) error {
ctx := api.NewDefaultContext()
e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName)
if err != nil || e.Protocol != api.ProtocolTCP {
if err != nil {
e = &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: serviceName,
Namespace: api.NamespaceDefault,
},
Protocol: api.ProtocolTCP,
}
}
found := false
FindEndpointLoop:
for i := range e.Endpoints {
ep := &e.Endpoints[i]
if ep.IP == ip.String() && ep.Port == port {
found = true
break
if ep.IP == ip.String() {
for j := range ep.Ports {
epp := &ep.Ports[j]
if epp.Protocol == api.ProtocolTCP && epp.Port == port {
found = true
break FindEndpointLoop
}
}
}
}
if !found {
e.Endpoints = append(e.Endpoints, api.Endpoint{IP: ip.String(), Port: port})
e.Endpoints = append(e.Endpoints, api.Endpoint{
IP: ip.String(),
Ports: []api.EndpointPort{
{Protocol: api.ProtocolTCP, Port: port},
},
})
}
if len(e.Endpoints) > m.masterCount {
// We append to the end and remove from the beginning, so this should

View File

@ -185,7 +185,7 @@ func TestServicesFromZeroError(t *testing.T) {
func TestEndpoints(t *testing.T) {
endpoint := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}},
}
fakeWatch := watch.NewFake()
@ -235,7 +235,7 @@ func TestEndpoints(t *testing.T) {
func TestEndpointsFromZero(t *testing.T) {
endpoint := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}},
}
fakeWatch := watch.NewFake()

View File

@ -25,8 +25,8 @@ import (
// LoadBalancer is an interface for distributing incoming requests to service endpoints.
type LoadBalancer interface {
// NextEndpoint returns the endpoint to handle a request for the given
// service and source address.
NextEndpoint(service string, srcAddr net.Addr) (string, error)
NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service string)
// serviceName:portName and source address.
NextEndpoint(service string, port string, srcAddr net.Addr) (string, error)
NewService(service string, port string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service string, port string)
}

View File

@ -69,7 +69,8 @@ type tcpProxySocket struct {
func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
for _, retryTimeout := range endpointDialTimeout {
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr)
// TODO: support multiple service ports
endpoint, err := proxier.loadBalancer.NextEndpoint(service, "", srcAddr)
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
return nil, err
@ -383,7 +384,8 @@ func (proxier *Proxier) ensurePortals() {
func (proxier *Proxier) cleanupStaleStickySessions() {
for name, info := range proxier.serviceMap {
if info.sessionAffinityType != api.AffinityTypeNone {
proxier.loadBalancer.CleanupStaleStickySessions(name)
// TODO: support multiple service ports
proxier.loadBalancer.CleanupStaleStickySessions(name, "")
}
}
}
@ -499,7 +501,8 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
if err != nil {
glog.Errorf("Failed to open portal for %q: %v", service.Name, err)
}
proxier.loadBalancer.NewService(service.Name, info.sessionAffinityType, info.stickyMaxAgeMinutes)
// TODO: support multiple service ports
proxier.loadBalancer.NewService(service.Name, "", info.sessionAffinityType, info.stickyMaxAgeMinutes)
}
proxier.mu.Lock()
defer proxier.mu.Unlock()

View File

@ -197,7 +197,7 @@ func TestTCPProxy(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}},
},
})
@ -217,7 +217,7 @@ func TestUDPProxy(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}},
},
})
@ -246,7 +246,7 @@ func TestTCPProxyStop(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}},
},
})
@ -277,7 +277,7 @@ func TestUDPProxyStop(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}},
},
})
@ -308,7 +308,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}},
},
})
@ -338,7 +338,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}},
},
})
@ -368,7 +368,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}},
},
})
@ -407,7 +407,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}},
},
})
@ -446,7 +446,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}},
},
})
@ -482,7 +482,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}},
},
})

View File

@ -49,18 +49,26 @@ type affinityPolicy struct {
ttlMinutes int
}
// balancerKey is a string that the balancer uses to key stored state.
// balancerKey is a string that the balancer uses to key stored state. It is
// formatted as "service_name:port_name", but that should be opaque to most consumers.
type balancerKey string
func makeBalancerKey(service, port string) balancerKey {
return balancerKey(fmt.Sprintf("%s:%s", service, port))
}
// LoadBalancerRR is a round-robin load balancer.
type LoadBalancerRR struct {
lock sync.RWMutex
services map[balancerKey]*balancerState
}
// Ensure this implements LoadBalancer.
var _ LoadBalancer = &LoadBalancerRR{}
type balancerState struct {
endpoints []string
index int
endpoints []string // a list of "ip:port" style strings
index int // index into endpoints
affinity affinityPolicy
}
@ -79,20 +87,20 @@ func NewLoadBalancerRR() *LoadBalancerRR {
}
}
func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error {
func (lb *LoadBalancerRR) NewService(service, port string, affinityType api.AffinityType, ttlMinutes int) error {
lb.lock.Lock()
defer lb.lock.Unlock()
lb.newServiceInternal(service, affinityType, ttlMinutes)
lb.newServiceInternal(service, port, affinityType, ttlMinutes)
return nil
}
func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) *balancerState {
func (lb *LoadBalancerRR) newServiceInternal(service, port string, affinityType api.AffinityType, ttlMinutes int) *balancerState {
if ttlMinutes == 0 {
ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead????
}
key := balancerKey(service)
key := makeBalancerKey(service, port)
if _, exists := lb.services[key]; !exists {
lb.services[key] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service)
@ -111,13 +119,13 @@ func isSessionAffinity(affinity *affinityPolicy) bool {
// NextEndpoint returns a service endpoint.
// The service endpoint is chosen using the round-robin algorithm.
func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) {
func (lb *LoadBalancerRR) NextEndpoint(service, port string, srcAddr net.Addr) (string, error) {
// Coarse locking is simple. We can get more fine-grained if/when we
// can prove it matters.
lb.lock.Lock()
defer lb.lock.Unlock()
key := balancerKey(service)
key := makeBalancerKey(service, port)
state, exists := lb.services[key]
if !exists || state == nil {
return "", ErrMissingServiceEntry
@ -166,18 +174,22 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string
return endpoint, nil
}
func isValidEndpoint(ep *api.Endpoint) bool {
return ep.IP != "" && ep.Port > 0
type hostPortPair struct {
host string
port int
}
func filterValidEndpoints(endpoints []api.Endpoint) []string {
// Convert Endpoint objects into strings for easier use later. Ignore
// the protocol field - we'll get that from the Service objects.
func isValidEndpoint(hpp *hostPortPair) bool {
return hpp.host != "" && hpp.port > 0
}
func getValidEndpoints(pairs []hostPortPair) []string {
// Convert structs into strings for easier use later.
var result []string
for i := range endpoints {
ep := &endpoints[i]
if isValidEndpoint(ep) {
result = append(result, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
for i := range pairs {
hpp := &pairs[i]
if isValidEndpoint(hpp) {
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
}
}
return result
@ -225,27 +237,45 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
defer lb.lock.Unlock()
// Update endpoints for services.
for _, svcEndpoints := range allEndpoints {
key := balancerKey(svcEndpoints.Name)
state, exists := lb.services[key]
curEndpoints := []string{}
if state != nil {
curEndpoints = state.endpoints
}
newEndpoints := filterValidEndpoints(svcEndpoints.Endpoints)
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints)
lb.updateAffinityMap(key, newEndpoints)
// On update can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created
// if one does not already exist.
state = lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0)
state.endpoints = slice.ShuffleStrings(newEndpoints)
for i := range allEndpoints {
svcEndpoints := &allEndpoints[i]
// Reset the round-robin index.
state.index = 0
// We need to build a map of portname -> all ip:ports for that portname.
portsToEndpoints := map[string][]hostPortPair{}
// Explode the Endpoints.Endpoints[*].Ports[*] into the aforementioned map.
// FIXME: this is awkward. Maybe a different factoring of Endpoints is better?
for j := range svcEndpoints.Endpoints {
ep := &svcEndpoints.Endpoints[j]
for k := range ep.Ports {
epp := &ep.Ports[k]
portsToEndpoints[epp.Name] = append(portsToEndpoints[epp.Name], hostPortPair{ep.IP, epp.Port})
// Ignore the protocol field - we'll get that from the Service objects.
}
}
for portname := range portsToEndpoints {
key := makeBalancerKey(svcEndpoints.Name, portname)
state, exists := lb.services[key]
curEndpoints := []string{}
if state != nil {
curEndpoints = state.endpoints
}
newEndpoints := getValidEndpoints(portsToEndpoints[portname])
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints)
lb.updateAffinityMap(key, newEndpoints)
// On update can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created
// if one does not already exist.
state = lb.newServiceInternal(svcEndpoints.Name, portname, api.AffinityTypeNone, 0)
state.endpoints = slice.ShuffleStrings(newEndpoints)
// Reset the round-robin index.
state.index = 0
}
registeredEndpoints[key] = true
}
registeredEndpoints[key] = true
}
// Remove endpoints missing from the update.
for k := range lb.services {
@ -267,11 +297,11 @@ func slicesEquiv(lhs, rhs []string) bool {
return false
}
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) {
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service, port string) {
lb.lock.Lock()
defer lb.lock.Unlock()
key := balancerKey(service)
key := makeBalancerKey(service, port)
state, exists := lb.services[key]
if !exists {
glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service)

View File

@ -24,29 +24,29 @@ import (
)
func TestValidateWorks(t *testing.T) {
if isValidEndpoint(&api.Endpoint{}) {
if isValidEndpoint(&hostPortPair{}) {
t.Errorf("Didn't fail for empty string")
}
if isValidEndpoint(&api.Endpoint{IP: "foobar"}) {
if isValidEndpoint(&hostPortPair{host: "foobar"}) {
t.Errorf("Didn't fail with no port")
}
if isValidEndpoint(&api.Endpoint{IP: "foobar", Port: -1}) {
if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) {
t.Errorf("Didn't fail with a negative port")
}
if !isValidEndpoint(&api.Endpoint{IP: "foobar", Port: 8080}) {
if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) {
t.Errorf("Failed a valid config.")
}
}
func TestFilterWorks(t *testing.T) {
endpoints := []api.Endpoint{
{IP: "foobar", Port: 1},
{IP: "foobar", Port: 2},
{IP: "foobar", Port: -1},
{IP: "foobar", Port: 3},
{IP: "foobar", Port: -2},
endpoints := []hostPortPair{
{host: "foobar", port: 1},
{host: "foobar", port: 2},
{host: "foobar", port: -1},
{host: "foobar", port: 3},
{host: "foobar", port: -2},
}
filtered := filterValidEndpoints(endpoints)
filtered := getValidEndpoints(endpoints)
if len(filtered) != 3 {
t.Errorf("Failed to filter to the correct size")
@ -66,7 +66,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
var endpoints []api.Endpoints
loadBalancer.OnUpdate(endpoints)
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", "http", nil)
if err == nil {
t.Errorf("Didn't fail with non-existent service")
}
@ -75,8 +75,8 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
}
}
func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, netaddr)
func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, port string, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, port, netaddr)
if err != nil {
t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
}
@ -87,25 +87,41 @@ func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string,
func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint1", Port: 40}},
Endpoints: []api.Endpoint{{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 40}}}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil)
}
func stringsInSlice(haystack []string, needles ...string) bool {
for _, needle := range needles {
found := false
for i := range haystack {
if haystack[i] == needle {
found = true
break
}
}
if found == false {
return false
}
}
return true
}
func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@ -113,22 +129,77 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil)
}
func TestLoadBalanceWorksWithMultipleEndpointsAndPorts(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 1},
{Name: "q", Port: 10},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 2},
{Name: "q", Port: 20},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 3},
{Name: "q", Port: 30},
}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil)
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil)
}
func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@ -136,37 +207,86 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 1},
{Name: "q", Port: 10},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 2},
{Name: "q", Port: 20},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 3},
{Name: "q", Port: 30},
}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil)
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 8},
{IP: "endpoint", Port: 9},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 8},
{Name: "q", Port: 80},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 9},
{Name: "q", Port: 90},
}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:8", "endpoint:9") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:80", "endpoint:90") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoint, err = loadBalancer.NextEndpoint("foo", "q", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@ -174,7 +294,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@ -182,133 +302,183 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 1},
{Name: "q", Port: 10},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 2},
{Name: "q", Port: 20},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 3},
{Name: "q", Port: 30},
}},
},
}
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 4},
{Name: "q", Port: 40},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 5},
{Name: "q", Port: 50},
}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil)
shuffledBarEndpoints := loadBalancer.services["bar"].endpoints
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
shuffledFooEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledFooEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledFooEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], nil)
shuffledFooEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints
if !stringsInSlice(shuffledFooEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") {
t.Errorf("did not find expected endpoints: %v", shuffledFooEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[1], nil)
shuffledBarEndpoints := loadBalancer.services[makeBalancerKey("bar", "p")].endpoints
if !stringsInSlice(shuffledBarEndpoints, "endpoint:4", "endpoint:5") {
t.Errorf("did not find expected endpoints: %v", shuffledBarEndpoints)
}
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil)
shuffledBarEndpoints = loadBalancer.services[makeBalancerKey("bar", "q")].endpoints
if !stringsInSlice(shuffledBarEndpoints, "endpoint:40", "endpoint:50") {
t.Errorf("did not find expected endpoints: %v", shuffledBarEndpoints)
}
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil)
// Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
// but bar is still there, and we continue RR from where we left off.
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil)
}
func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint", Port: 1}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2)
}
func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client2)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client2)
}
func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
func TestStickyLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", api.AffinityTypeNone, 0)
loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
}
func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
func TestStickyLoadBalanceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", "p", api.AffinityTypeNone, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client1)
}
func TestStickyLoadBalanceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
@ -316,41 +486,44 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0}
client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
client1Endpoint := shuffledEndpoints[0]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
client2Endpoint := shuffledEndpoints[1]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3)
client3Endpoint := shuffledEndpoints[2]
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services["foo"].endpoints
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if client1Endpoint == "endpoint:3" {
client1Endpoint = shuffledEndpoints[0]
} else if client2Endpoint == "endpoint:3" {
@ -358,26 +531,26 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
} else if client3Endpoint == "endpoint:3" {
client3Endpoint = shuffledEndpoints[0]
}
expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3)
expectEndpoint(t, loadBalancer, "foo", "p", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", "p", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", "p", client3Endpoint, client3)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 4},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 4}}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client4)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client5)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client6)
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
expectEndpoint(t, loadBalancer, "foo", "p", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", "p", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", "p", client3Endpoint, client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client4)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client5)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client6)
}
func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
@ -385,51 +558,54 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 4}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 5}}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@ -440,58 +616,58 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}},
},
}
loadBalancer.NewService("bar", api.AffinityTypeClientIP, 0)
loadBalancer.NewService("bar", "p", api.AffinityTypeClientIP, 0)
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 5},
{IP: "endpoint", Port: 5},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 5}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 6}}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
shuffledFooEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2)
shuffledBarEndpoints := loadBalancer.services["bar"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
shuffledBarEndpoints := loadBalancer.services[makeBalancerKey("bar", "p")].endpoints
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1)
// Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
// but bar is still there, and we continue RR from where we left off.
shuffledBarEndpoints = loadBalancer.services["bar"].endpoints
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
shuffledBarEndpoints = loadBalancer.services[makeBalancerKey("bar", "p")].endpoints
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1)
}

View File

@ -27,10 +27,20 @@ import (
)
func TestGetEndpoints(t *testing.T) {
expected := []api.Endpoint{
{IP: "127.0.0.1", Ports: []api.EndpointPort{
{Name: "p", Port: 9000, Protocol: api.ProtocolTCP},
{Name: "q", Port: 9000, Protocol: api.ProtocolUDP},
}},
{IP: "127.0.0.2", Ports: []api.EndpointPort{
{Name: "p", Port: 8000, Protocol: api.ProtocolTCP},
{Name: "q", Port: 8000, Protocol: api.ProtocolUDP},
}},
}
registry := &registrytest.ServiceRegistry{
Endpoints: api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
Endpoints: expected,
},
}
storage := NewREST(registry)
@ -39,7 +49,7 @@ func TestGetEndpoints(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %#v", err)
}
if !reflect.DeepEqual([]api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, obj.(*api.Endpoints).Endpoints) {
if !reflect.DeepEqual(expected, obj.(*api.Endpoints).Endpoints) {
t.Errorf("unexpected endpoints: %#v", obj)
}
}

View File

@ -24,6 +24,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
@ -598,10 +600,10 @@ func TestEtcdListEndpoints(t *testing.T) {
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP", Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 8345}}}),
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Name: "p", Port: 8345, Protocol: api.ProtocolTCP}}}}}),
},
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}, Protocol: "TCP"}),
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}}),
},
},
},
@ -625,8 +627,7 @@ func TestEtcdGetEndpoints(t *testing.T) {
registry := NewTestEtcdRegistry(fakeClient)
endpoints := &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Protocol: "TCP",
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 34855}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 34855, Protocol: api.ProtocolTCP}}}},
}
key, _ := makeServiceEndpointsKey(ctx, "foo")
@ -647,10 +648,19 @@ func TestEtcdUpdateEndpoints(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
registry := NewTestEtcdRegistry(fakeClient)
// TODO: Once we drop single-port APIs, make this test use the
// multi-port features. This will force a compile error when those APIs
// are deleted.
_ = v1beta1.Dependency
_ = v1beta2.Dependency
endpoints := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Protocol: "TCP",
Endpoints: []api.Endpoint{{IP: "baz"}, {IP: "bar"}},
Endpoints: []api.Endpoint{
{IP: "baz", Ports: []api.EndpointPort{{Port: 1, Protocol: api.ProtocolTCP}}},
{IP: "bar", Ports: []api.EndpointPort{{Port: 2, Protocol: api.ProtocolTCP}}},
},
}
key, _ := makeServiceEndpointsKey(ctx, "foo")

View File

@ -21,6 +21,7 @@ import (
"math/rand"
"net"
"strconv"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
@ -218,17 +219,57 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
// ResourceLocation returns a URL to which one can send traffic for the specified service.
func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) {
eps, err := rs.registry.GetEndpoints(ctx, id)
// Allow ID as "svcname" or "svcname:port". Choose an endpoint at
// random. If the port is specified as a number, use that value
// directly. If the port is specified as a name, try to look up that
// name on the chosen endpoint. If port is not specified, try to use
// the first unnamed port on the chosen endpoint. If there are no
// unnamed ports, try to use the first defined port.
parts := strings.Split(id, ":")
if len(parts) > 2 {
return "", errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id))
}
name := parts[0]
port := ""
if len(parts) == 2 {
port = parts[1]
}
eps, err := rs.registry.GetEndpoints(ctx, name)
if err != nil {
return "", err
}
if len(eps.Endpoints) == 0 {
return "", fmt.Errorf("no endpoints available for %v", id)
return "", fmt.Errorf("no endpoints available for %v", name)
}
ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))]
// Try to figure out a port.
if _, err := strconv.Atoi(port); err != nil {
// Do nothing - port is correct as is.
} else {
// Try a name lookup, even if name is "".
for i := range ep.Ports {
if ep.Ports[i].Name == port {
port = strconv.Itoa(ep.Ports[i].Port)
break
}
}
}
if port == "" {
// Still nothing - try the first defined port.
if len(ep.Ports) > 0 {
port = strconv.Itoa(ep.Ports[0].Port)
}
}
// We leave off the scheme ('http://') because we have no idea what sort of server
// is listening at this endpoint.
ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))]
return net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)), nil
loc := ep.IP
if port != "" {
loc += fmt.Sprintf(":%s", port)
}
return loc, nil
}
func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error {

View File

@ -370,7 +370,7 @@ func TestServiceRegistryGet(t *testing.T) {
func TestServiceRegistryResourceLocation(t *testing.T) {
ctx := api.NewDefaultContext()
registry := registrytest.NewServiceRegistry()
registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Port: 80}}}
registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Ports: []api.EndpointPort{{Port: 80}}}}}
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))

View File

@ -87,7 +87,11 @@ func (e *EndpointController) SyncServiceEndpoints() error {
continue
}
endpoints = append(endpoints, api.Endpoint{IP: pod.Status.PodIP, Port: port})
// TODO: Add multiple-ports to Service and expose them here.
endpoints = append(endpoints, api.Endpoint{
IP: pod.Status.PodIP,
Ports: []api.EndpointPort{{Name: "", Protocol: service.Spec.Protocol, Port: port}},
})
}
currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
@ -96,7 +100,6 @@ func (e *EndpointController) SyncServiceEndpoints() error {
ObjectMeta: api.ObjectMeta{
Name: service.Name,
},
Protocol: service.Spec.Protocol,
}
} else {
glog.Errorf("Error getting endpoints: %v", err)
@ -112,7 +115,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
} else {
// Pre-existing
if currentEndpoints.Protocol == service.Spec.Protocol && endpointsEqual(currentEndpoints, endpoints) {
if endpointsEqual(currentEndpoints, endpoints) {
glog.V(5).Infof("protocol and endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
continue
}
@ -126,12 +129,27 @@ func (e *EndpointController) SyncServiceEndpoints() error {
return resultErr
}
// TODO: It would be nice if we had a util function that reflectively compared
// two slices for order-insensitive equivalence.
func portsEqual(lhs, rhs []api.EndpointPort) bool {
if len(lhs) != len(rhs) {
return false
}
for i := range lhs {
if lhs[i] != rhs[i] {
return false
}
}
return true
}
func containsEndpoint(haystack *api.Endpoints, needle *api.Endpoint) bool {
if haystack == nil || needle == nil {
return false
}
for ix := range haystack.Endpoints {
if haystack.Endpoints[ix] == *needle {
haystackEP := &haystack.Endpoints[ix]
if haystackEP.IP == needle.IP && portsEqual(haystackEP.Ports, needle.Ports) {
return true
}
}

View File

@ -245,8 +245,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}},
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@ -277,8 +276,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}},
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@ -309,8 +307,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Protocol: api.ProtocolUDP,
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}},
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolUDP, Port: 1000}}}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@ -340,7 +337,6 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{},
}})
defer testServer.Close()
@ -354,8 +350,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}},
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}},
})
endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=other", "PUT", &data)
}
@ -381,8 +376,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}},
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@ -395,8 +389,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}},
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}},
})
endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=bar", "PUT", &data)
}
@ -421,8 +414,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
ObjectMeta: api.ObjectMeta{
ResourceVersion: "1",
},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}},
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@ -460,8 +452,7 @@ func TestSyncEndpointsItems(t *testing.T) {
ObjectMeta: api.ObjectMeta{
ResourceVersion: "",
},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}},
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}},
})
endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints?namespace=other", "POST", &data)
}

View File

@ -308,7 +308,7 @@ func TestWatchEtcdState(t *testing.T) {
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
@ -321,7 +321,7 @@ func TestWatchEtcdState(t *testing.T) {
},
From: 1,
Expected: []*T{
{watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}},
{watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}},
},
},
"from initial state": {
@ -343,7 +343,7 @@ func TestWatchEtcdState(t *testing.T) {
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
@ -356,7 +356,7 @@ func TestWatchEtcdState(t *testing.T) {
},
Expected: []*T{
{watch.Added, nil},
{watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}},
{watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}},
},
},
}

View File

@ -250,8 +250,11 @@ var _ = Describe("Services", func() {
func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEndpoints []string, endpoints *api.Endpoints) {
ips := util.StringSet{}
for _, ep := range endpoints.Endpoints {
if ep.Port != expectedPort {
Fail(fmt.Sprintf("invalid port, expected %d, got %d", expectedPort, ep.Port))
if len(ep.Ports) == 0 {
Fail(fmt.Sprintf("invalid endpoint, no ports"))
}
if ep.Ports[0].Port != expectedPort {
Fail(fmt.Sprintf("invalid port, expected %d, got %d", expectedPort, ep.Ports[0].Port))
}
ips.Insert(ep.IP)
}