Implement multi-port endpoints

Instead of endpoints being a flat list, it is now a list of "subsets"
where each is a struct of {Addresses, Ports}.  To generate the list of
endpoints you need to take union of the Cartesian products of the
subsets.  This is compact in the vast majority of cases, yet still
represents named ports and corner cases (e.g. each pod has a different
port number).

This also stores subsets in a deterministic order (sorted by hash) to
avoid spurious updates and comparison problems.

This is a fully compatible change - old objects and clients will
keepworking as long as they don't need the new functionality.

This is the prep for multi-port Services, which will add API to produce
endpoints in this new structure.
This commit is contained in:
Tim Hockin
2015-03-20 14:24:43 -07:00
parent 3eda80b3c5
commit 8ae203825b
43 changed files with 2090 additions and 1030 deletions

View File

@@ -185,7 +185,10 @@ func TestServicesFromZeroError(t *testing.T) {
func TestEndpoints(t *testing.T) {
endpoint := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: 9000}},
}},
}
fakeWatch := watch.NewFake()
@@ -235,7 +238,10 @@ func TestEndpoints(t *testing.T) {
func TestEndpointsFromZero(t *testing.T) {
endpoint := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: 9000}},
}},
}
fakeWatch := watch.NewFake()

View File

@@ -218,11 +218,17 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
handler.Wait(2)
handler2.Wait(2)
@@ -244,11 +250,17 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
handler.Wait(2)
handler2.Wait(2)
@@ -262,7 +274,10 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
// Add one more
endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
Endpoints: []api.Endpoint{{IP: "endpoint5"}, {IP: "endpoint6"}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "5.5.5.5"}, {IP: "6.6.6.6"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
handler.Wait(1)
handler2.Wait(1)
@@ -274,7 +289,10 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
// Update the "foo" service with new endpoints
endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint7"}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "7.7.7.7"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
handler.Wait(1)
handler2.Wait(1)

View File

@@ -26,8 +26,8 @@ import (
// LoadBalancer is an interface for distributing incoming requests to service endpoints.
type LoadBalancer interface {
// NextEndpoint returns the endpoint to handle a request for the given
// service and source address.
NextEndpoint(service types.NamespacedName, srcAddr net.Addr) (string, error)
NewService(service types.NamespacedName, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service types.NamespacedName)
// service-port and source address.
NextEndpoint(service types.NamespacedName, port string, srcAddr net.Addr) (string, error)
NewService(service types.NamespacedName, port string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service types.NamespacedName, port string)
}

View File

@@ -70,7 +70,8 @@ type tcpProxySocket struct {
func tryConnect(service types.NamespacedName, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
for _, retryTimeout := range endpointDialTimeout {
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr)
// TODO: support multiple service ports
endpoint, err := proxier.loadBalancer.NextEndpoint(service, "", srcAddr)
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
return nil, err
@@ -388,7 +389,8 @@ func (proxier *Proxier) ensurePortals() {
func (proxier *Proxier) cleanupStaleStickySessions() {
for name, info := range proxier.serviceMap {
if info.sessionAffinityType != api.AffinityTypeNone {
proxier.loadBalancer.CleanupStaleStickySessions(name)
// TODO: support multiple service ports
proxier.loadBalancer.CleanupStaleStickySessions(name, "")
}
}
}
@@ -509,7 +511,8 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
if err != nil {
glog.Errorf("Failed to open portal for %q: %v", serviceName, err)
}
proxier.loadBalancer.NewService(serviceName, info.sessionAffinityType, info.stickyMaxAgeMinutes)
// TODO: support multiple service ports
proxier.loadBalancer.NewService(serviceName, "", info.sessionAffinityType, info.stickyMaxAgeMinutes)
}
proxier.mu.Lock()
defer proxier.mu.Unlock()

View File

@@ -199,7 +199,10 @@ func TestTCPProxy(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
}},
},
})
@@ -220,7 +223,10 @@ func TestUDPProxy(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: udpServerPort}},
}},
},
})
@@ -250,7 +256,10 @@ func TestTCPProxyStop(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
}},
},
})
@@ -282,7 +291,10 @@ func TestUDPProxyStop(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: udpServerPort}},
}},
},
})
@@ -314,7 +326,10 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
}},
},
})
@@ -345,7 +360,10 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: udpServerPort}},
}},
},
})
@@ -376,7 +394,10 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
}},
},
})
@@ -416,7 +437,10 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: udpServerPort}},
}},
},
})
@@ -456,7 +480,10 @@ func TestTCPProxyUpdatePort(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
}},
},
})
@@ -493,7 +520,10 @@ func TestUDPProxyUpdatePort(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: udpServerPort}},
}},
},
})
@@ -527,7 +557,10 @@ func TestProxyUpdatePortal(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []api.EndpointPort{{Port: tcpServerPort}},
}},
},
})

View File

@@ -50,15 +50,28 @@ type affinityPolicy struct {
ttlMinutes int
}
// servicePort is the type that the balancer uses to key stored state.
type servicePort struct {
types.NamespacedName
port string
}
func (sp servicePort) String() string {
return fmt.Sprintf("%s:%s", sp.NamespacedName, sp.port)
}
// LoadBalancerRR is a round-robin load balancer.
type LoadBalancerRR struct {
lock sync.RWMutex
services map[types.NamespacedName]*balancerState
services map[servicePort]*balancerState
}
// Ensure this implements LoadBalancer.
var _ LoadBalancer = &LoadBalancerRR{}
type balancerState struct {
endpoints []string
index int
endpoints []string // a list of "ip:port" style strings
index int // current index into endpoints
affinity affinityPolicy
}
@@ -73,29 +86,30 @@ func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityP
// NewLoadBalancerRR returns a new LoadBalancerRR.
func NewLoadBalancerRR() *LoadBalancerRR {
return &LoadBalancerRR{
services: map[types.NamespacedName]*balancerState{},
services: map[servicePort]*balancerState{},
}
}
func (lb *LoadBalancerRR) NewService(service types.NamespacedName, affinityType api.AffinityType, ttlMinutes int) error {
func (lb *LoadBalancerRR) NewService(service types.NamespacedName, port string, affinityType api.AffinityType, ttlMinutes int) error {
svcPort := servicePort{service, port}
lb.lock.Lock()
defer lb.lock.Unlock()
lb.newServiceInternal(service, affinityType, ttlMinutes)
lb.newServiceInternal(svcPort, affinityType, ttlMinutes)
return nil
}
// This assumes that lb.lock is already held.
func (lb *LoadBalancerRR) newServiceInternal(service types.NamespacedName, affinityType api.AffinityType, ttlMinutes int) *balancerState {
func (lb *LoadBalancerRR) newServiceInternal(svcPort servicePort, affinityType api.AffinityType, ttlMinutes int) *balancerState {
if ttlMinutes == 0 {
ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead????
}
if _, exists := lb.services[service]; !exists {
lb.services[service] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service)
if _, exists := lb.services[svcPort]; !exists {
lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", svcPort)
}
return lb.services[service]
return lb.services[svcPort]
}
// return true if this service is using some form of session affinity.
@@ -109,21 +123,22 @@ func isSessionAffinity(affinity *affinityPolicy) bool {
// NextEndpoint returns a service endpoint.
// The service endpoint is chosen using the round-robin algorithm.
func (lb *LoadBalancerRR) NextEndpoint(service types.NamespacedName, srcAddr net.Addr) (string, error) {
func (lb *LoadBalancerRR) NextEndpoint(service types.NamespacedName, port string, srcAddr net.Addr) (string, error) {
svcPort := servicePort{service, port}
// Coarse locking is simple. We can get more fine-grained if/when we
// can prove it matters.
lb.lock.Lock()
defer lb.lock.Unlock()
key := service
state, exists := lb.services[key]
state, exists := lb.services[svcPort]
if !exists || state == nil {
return "", ErrMissingServiceEntry
}
if len(state.endpoints) == 0 {
return "", ErrMissingEndpoints
}
glog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", service, srcAddr, state.endpoints)
glog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", svcPort, srcAddr, state.endpoints)
sessionAffinityEnabled := isSessionAffinity(&state.affinity)
@@ -140,7 +155,7 @@ func (lb *LoadBalancerRR) NextEndpoint(service types.NamespacedName, srcAddr net
// Affinity wins.
endpoint := sessionAffinity.endpoint
sessionAffinity.lastUsed = time.Now()
glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", service, ipaddr, sessionAffinity, endpoint)
glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", svcPort, ipaddr, sessionAffinity, endpoint)
return endpoint, nil
}
}
@@ -164,28 +179,33 @@ func (lb *LoadBalancerRR) NextEndpoint(service types.NamespacedName, srcAddr net
return endpoint, nil
}
func isValidEndpoint(ep *api.Endpoint) bool {
return ep.IP != "" && ep.Port > 0
type hostPortPair struct {
host string
port int
}
func filterValidEndpoints(endpoints []api.Endpoint) []string {
func isValidEndpoint(hpp *hostPortPair) bool {
return hpp.host != "" && hpp.port > 0
}
func flattenValidEndpoints(endpoints []hostPortPair) []string {
// Convert Endpoint objects into strings for easier use later. Ignore
// the protocol field - we'll get that from the Service objects.
var result []string
for i := range endpoints {
ep := &endpoints[i]
if isValidEndpoint(ep) {
result = append(result, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
hpp := &endpoints[i]
if isValidEndpoint(hpp) {
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
}
}
return result
}
// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
func removeSessionAffinityByEndpoint(state *balancerState, service types.NamespacedName, endpoint string) {
func removeSessionAffinityByEndpoint(state *balancerState, svcPort servicePort, endpoint string) {
for _, affinity := range state.affinity.affinityMap {
if affinity.endpoint == endpoint {
glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, service)
glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, svcPort)
delete(state.affinity.affinityMap, affinity.clientIP)
}
}
@@ -194,12 +214,12 @@ func removeSessionAffinityByEndpoint(state *balancerState, service types.Namespa
// Loop through the valid endpoints and then the endpoints associated with the Load Balancer.
// Then remove any session affinity records that are not in both lists.
// This assumes the lb.lock is held.
func (lb *LoadBalancerRR) updateAffinityMap(service types.NamespacedName, newEndpoints []string) {
func (lb *LoadBalancerRR) updateAffinityMap(svcPort servicePort, newEndpoints []string) {
allEndpoints := map[string]int{}
for _, newEndpoint := range newEndpoints {
allEndpoints[newEndpoint] = 1
}
state, exists := lb.services[service]
state, exists := lb.services[svcPort]
if !exists {
return
}
@@ -208,8 +228,8 @@ func (lb *LoadBalancerRR) updateAffinityMap(service types.NamespacedName, newEnd
}
for mKey, mVal := range allEndpoints {
if mVal == 1 {
glog.V(3).Infof("Delete endpoint %s for service %q", mKey, service)
removeSessionAffinityByEndpoint(state, service, mKey)
glog.V(3).Infof("Delete endpoint %s for service %q", mKey, svcPort)
removeSessionAffinityByEndpoint(state, svcPort, mKey)
}
}
}
@@ -218,33 +238,52 @@ func (lb *LoadBalancerRR) updateAffinityMap(service types.NamespacedName, newEnd
// Registered endpoints are updated if found in the update set or
// unregistered if missing from the update set.
func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
registeredEndpoints := make(map[types.NamespacedName]bool)
registeredEndpoints := make(map[servicePort]bool)
lb.lock.Lock()
defer lb.lock.Unlock()
// Update endpoints for services.
for _, svcEndpoints := range allEndpoints {
name := types.NamespacedName{svcEndpoints.Namespace, svcEndpoints.Name}
key := name
state, exists := lb.services[key]
curEndpoints := []string{}
if state != nil {
curEndpoints = state.endpoints
}
newEndpoints := filterValidEndpoints(svcEndpoints.Endpoints)
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints)
lb.updateAffinityMap(key, newEndpoints)
// On update can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created
// if one does not already exist.
state = lb.newServiceInternal(name, api.AffinityTypeNone, 0)
state.endpoints = slice.ShuffleStrings(newEndpoints)
for i := range allEndpoints {
svcEndpoints := &allEndpoints[i]
// Reset the round-robin index.
state.index = 0
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
portsToEndpoints := map[string][]hostPortPair{}
for i := range svcEndpoints.Subsets {
ss := &svcEndpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
for i := range ss.Addresses {
addr := &ss.Addresses[i]
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, port.Port})
// Ignore the protocol field - we'll get that from the Service objects.
}
}
}
for portname := range portsToEndpoints {
svcPort := servicePort{types.NamespacedName{svcEndpoints.Namespace, svcEndpoints.Name}, portname}
state, exists := lb.services[svcPort]
curEndpoints := []string{}
if state != nil {
curEndpoints = state.endpoints
}
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
lb.updateAffinityMap(svcPort, newEndpoints)
// OnUpdate can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created
// if one does not already exist.
state = lb.newServiceInternal(svcPort, api.AffinityTypeNone, 0)
state.endpoints = slice.ShuffleStrings(newEndpoints)
// Reset the round-robin index.
state.index = 0
}
registeredEndpoints[svcPort] = true
}
registeredEndpoints[key] = true
}
// Remove endpoints missing from the update.
for k := range lb.services {
@@ -266,19 +305,20 @@ func slicesEquiv(lhs, rhs []string) bool {
return false
}
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service types.NamespacedName) {
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service types.NamespacedName, port string) {
svcPort := servicePort{service, port}
lb.lock.Lock()
defer lb.lock.Unlock()
key := service
state, exists := lb.services[key]
state, exists := lb.services[svcPort]
if !exists {
glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service)
glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", svcPort)
return
}
for ip, affinity := range state.affinity.affinityMap {
if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= state.affinity.ttlMinutes {
glog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, service)
glog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, svcPort)
delete(state.affinity.affinityMap, ip)
}
}

View File

@@ -25,29 +25,29 @@ import (
)
func TestValidateWorks(t *testing.T) {
if isValidEndpoint(&api.Endpoint{}) {
t.Errorf("Didn't fail for empty string")
if isValidEndpoint(&hostPortPair{}) {
t.Errorf("Didn't fail for empty set")
}
if isValidEndpoint(&api.Endpoint{IP: "foobar"}) {
t.Errorf("Didn't fail with no port")
if isValidEndpoint(&hostPortPair{host: "foobar"}) {
t.Errorf("Didn't fail with invalid port")
}
if isValidEndpoint(&api.Endpoint{IP: "foobar", Port: -1}) {
if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) {
t.Errorf("Didn't fail with a negative port")
}
if !isValidEndpoint(&api.Endpoint{IP: "foobar", Port: 8080}) {
if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) {
t.Errorf("Failed a valid config.")
}
}
func TestFilterWorks(t *testing.T) {
endpoints := []api.Endpoint{
{IP: "foobar", Port: 1},
{IP: "foobar", Port: 2},
{IP: "foobar", Port: -1},
{IP: "foobar", Port: 3},
{IP: "foobar", Port: -2},
endpoints := []hostPortPair{
{host: "foobar", port: 1},
{host: "foobar", port: 2},
{host: "foobar", port: -1},
{host: "foobar", port: 3},
{host: "foobar", port: -2},
}
filtered := filterValidEndpoints(endpoints)
filtered := flattenValidEndpoints(endpoints)
if len(filtered) != 3 {
t.Errorf("Failed to filter to the correct size")
@@ -68,7 +68,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
var endpoints []api.Endpoints
loadBalancer.OnUpdate(endpoints)
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, "does-not-exist", nil)
if err == nil {
t.Errorf("Didn't fail with non-existent service")
}
@@ -77,101 +77,208 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
}
}
func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service types.NamespacedName, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, netaddr)
func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service types.NamespacedName, port string, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, port, netaddr)
if err != nil {
t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
t.Errorf("Didn't find a service for %s:%s, expected %s, failed with: %v", service, port, expected, err)
}
if endpoint != expected {
t.Errorf("Didn't get expected endpoint for service %s client %v, expected %s, got: %s", service, netaddr, expected, endpoint)
t.Errorf("Didn't get expected endpoint for service %s:%s client %v, expected %s, got: %s", service, port, netaddr, expected, endpoint)
}
}
func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "endpoint1", Port: 40}},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "endpoint1"}},
Ports: []api.EndpointPort{{Name: "p", Port: 40}},
}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "p", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "p", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "p", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, service, "p", "endpoint1:40", nil)
}
func stringsInSlice(haystack []string, needles ...string) bool {
for _, needle := range needles {
found := false
for i := range haystack {
if haystack[i] == needle {
found = true
break
}
}
if found == false {
return false
}
}
return true
}
func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "endpoint"}},
Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "p", Port: 2}, {Name: "p", Port: 3}},
}},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[servicePort{service, "p"}].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
}
func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}},
Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "q", Port: 2}},
},
{
Addresses: []api.EndpointAddress{{IP: "endpoint3"}},
Ports: []api.EndpointPort{{Name: "p", Port: 3}, {Name: "q", Port: 4}},
},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
shuffledEndpoints := loadBalancer.services[servicePort{service, "p"}].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
shuffledEndpoints = loadBalancer.services[servicePort{service, "q"}].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint1:2", "endpoint2:2", "endpoint3:4") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[0], nil)
}
func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint1"}},
Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "q", Port: 10}},
},
{
Addresses: []api.EndpointAddress{{IP: "endpoint2"}},
Ports: []api.EndpointPort{{Name: "p", Port: 2}, {Name: "q", Port: 20}},
},
{
Addresses: []api.EndpointAddress{{IP: "endpoint3"}},
Ports: []api.EndpointPort{{Name: "p", Port: 3}, {Name: "q", Port: 30}},
},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 8},
{IP: "endpoint", Port: 9},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], nil)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{}}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint(service, nil)
shuffledEndpoints := loadBalancer.services[servicePort{service, "p"}].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
shuffledEndpoints = loadBalancer.services[servicePort{service, "q"}].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint1:10", "endpoint2:20", "endpoint3:30") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[0], nil)
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint4"}},
Ports: []api.EndpointPort{{Name: "p", Port: 4}, {Name: "q", Port: 40}},
},
{
Addresses: []api.EndpointAddress{{IP: "endpoint5"}},
Ports: []api.EndpointPort{{Name: "p", Port: 5}, {Name: "q", Port: 50}},
},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[servicePort{service, "p"}].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "p", shuffledEndpoints[1], nil)
shuffledEndpoints = loadBalancer.services[servicePort{service, "q"}].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint4:40", "endpoint5:50") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, service, "q", shuffledEndpoints[1], nil)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint(service, "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@@ -181,53 +288,56 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
fooService := types.NewNamespacedNameOrDie("testnamespace", "foo")
barService := types.NewNamespacedNameOrDie("testnamespace", "bar")
endpoint, err := loadBalancer.NextEndpoint(fooService, nil)
endpoint, err := loadBalancer.NextEndpoint(fooService, "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint1"}, {IP: "endpoint2"}, {IP: "endpoint3"}},
Ports: []api.EndpointPort{{Name: "p", Port: 123}},
},
},
}
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint4"}, {IP: "endpoint5"}, {IP: "endpoint6"}},
Ports: []api.EndpointPort{{Name: "p", Port: 456}},
},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], nil)
shuffledFooEndpoints := loadBalancer.services[servicePort{fooService, "p"}].endpoints
expectEndpoint(t, loadBalancer, fooService, "p", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, fooService, "p", shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, fooService, "p", shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, fooService, "p", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, fooService, "p", shuffledFooEndpoints[1], nil)
shuffledBarEndpoints := loadBalancer.services[barService].endpoints
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
shuffledBarEndpoints := loadBalancer.services[servicePort{barService, "p"}].endpoints
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[2], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[1], nil)
// Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint(fooService, nil)
endpoint, err = loadBalancer.NextEndpoint(fooService, "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
// but bar is still there, and we continue RR from where we left off.
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[2], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, barService, "p", shuffledBarEndpoints[2], nil)
}
func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
@@ -235,21 +345,21 @@ func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, "", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
loadBalancer.NewService(service, "", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{{IP: "endpoint", Port: 1}},
Subsets: []api.EndpointSubset{{Addresses: []api.EndpointAddress{{IP: "endpoint"}}, Ports: []api.EndpointPort{{Port: 1}}}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, service, "endpoint:1", client1)
expectEndpoint(t, loadBalancer, service, "endpoint:1", client1)
expectEndpoint(t, loadBalancer, service, "endpoint:1", client2)
expectEndpoint(t, loadBalancer, service, "endpoint:1", client2)
expectEndpoint(t, loadBalancer, service, "", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, service, "", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, service, "", "endpoint:1", client2)
expectEndpoint(t, loadBalancer, service, "", "endpoint:1", client2)
}
func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
@@ -258,31 +368,32 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, "", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
loadBalancer.NewService(service, "", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint"}},
Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
shuffledEndpoints := loadBalancer.services[servicePort{service, ""}].endpoints
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
}
func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
@@ -291,32 +402,33 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, "", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService(service, api.AffinityTypeNone, 0)
loadBalancer.NewService(service, "", api.AffinityTypeNone, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint"}},
Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client1)
shuffledEndpoints := loadBalancer.services[servicePort{service, ""}].endpoints
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client3)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client1)
}
func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
@@ -328,41 +440,44 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0}
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, "", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
loadBalancer.NewService(service, "", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint"}},
Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
shuffledEndpoints := loadBalancer.services[servicePort{service, ""}].endpoints
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
client1Endpoint := shuffledEndpoints[0]
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
client2Endpoint := shuffledEndpoints[1]
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client3)
client3Endpoint := shuffledEndpoints[2]
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint"}},
Ports: []api.EndpointPort{{Port: 1}, {Port: 2}},
},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[service].endpoints
shuffledEndpoints = loadBalancer.services[servicePort{service, ""}].endpoints
if client1Endpoint == "endpoint:3" {
client1Endpoint = shuffledEndpoints[0]
} else if client2Endpoint == "endpoint:3" {
@@ -370,26 +485,27 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
} else if client3Endpoint == "endpoint:3" {
client3Endpoint = shuffledEndpoints[0]
}
expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
expectEndpoint(t, loadBalancer, service, "", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, service, "", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, service, "", client3Endpoint, client3)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 4},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint"}},
Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 4}},
},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, client1Endpoint, client1)
expectEndpoint(t, loadBalancer, service, client2Endpoint, client2)
expectEndpoint(t, loadBalancer, service, client3Endpoint, client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client4)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client5)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client6)
shuffledEndpoints = loadBalancer.services[servicePort{service, ""}].endpoints
expectEndpoint(t, loadBalancer, service, "", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, service, "", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, service, "", client3Endpoint, client3)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client4)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client5)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client6)
}
func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
@@ -398,51 +514,56 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
service := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(service, nil)
endpoint, err := loadBalancer.NextEndpoint(service, "", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService(service, api.AffinityTypeClientIP, 0)
loadBalancer.NewService(service, "", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint"}},
Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
shuffledEndpoints := loadBalancer.services[servicePort{service, ""}].endpoints
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5},
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint"}},
Ports: []api.EndpointPort{{Port: 4}, {Port: 5}},
},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[service].endpoints
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2)
shuffledEndpoints = loadBalancer.services[servicePort{service, ""}].endpoints
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, service, "", shuffledEndpoints[1], client2)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Endpoints: []api.Endpoint{}}
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint(service, nil)
endpoint, err = loadBalancer.NextEndpoint(service, "", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@@ -454,60 +575,66 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
fooService := types.NewNamespacedNameOrDie("testnamespace", "foo")
endpoint, err := loadBalancer.NextEndpoint(fooService, nil)
endpoint, err := loadBalancer.NextEndpoint(fooService, "", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService(fooService, api.AffinityTypeClientIP, 0)
loadBalancer.NewService(fooService, "", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint"}},
Ports: []api.EndpointPort{{Port: 1}, {Port: 2}, {Port: 3}},
},
},
}
barService := types.NewNamespacedNameOrDie("testnamespace", "bar")
loadBalancer.NewService(barService, api.AffinityTypeClientIP, 0)
loadBalancer.NewService(barService, "", api.AffinityTypeClientIP, 0)
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace},
Endpoints: []api.Endpoint{
{IP: "endpoint", Port: 5},
{IP: "endpoint", Port: 5},
Subsets: []api.EndpointSubset{
{
Addresses: []api.EndpointAddress{{IP: "endpoint"}},
Ports: []api.EndpointPort{{Port: 4}, {Port: 5}},
},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.services[fooService].endpoints
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
shuffledFooEndpoints := loadBalancer.services[servicePort{fooService, ""}].endpoints
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, fooService, "", shuffledFooEndpoints[2], client3)
shuffledBarEndpoints := loadBalancer.services[barService].endpoints
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1)
shuffledBarEndpoints := loadBalancer.services[servicePort{barService, ""}].endpoints
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[1], client2)
// Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint(fooService, nil)
endpoint, err = loadBalancer.NextEndpoint(fooService, "", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
// but bar is still there, and we continue RR from where we left off.
shuffledBarEndpoints = loadBalancer.services[barService].endpoints
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[0], client1)
shuffledBarEndpoints = loadBalancer.services[servicePort{barService, ""}].endpoints
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, barService, "", shuffledBarEndpoints[0], client1)
}