Refactor load balancer.

Flatten 3 parallel maps into a map to structs (balancerState).
This commit is contained in:
Tim Hockin 2015-02-21 22:19:50 -08:00
parent 743888739d
commit 362106d8eb
2 changed files with 114 additions and 81 deletions

View File

@ -18,6 +18,7 @@ package proxy
import ( import (
"errors" "errors"
"fmt"
"net" "net"
"reflect" "reflect"
"strconv" "strconv"
@ -54,9 +55,13 @@ type balancerKey string
// LoadBalancerRR is a round-robin load balancer. // LoadBalancerRR is a round-robin load balancer.
type LoadBalancerRR struct { type LoadBalancerRR struct {
lock sync.RWMutex lock sync.RWMutex
endpointsMap map[balancerKey][]string services map[balancerKey]*balancerState
rrIndex map[balancerKey]int }
serviceDtlMap map[balancerKey]affinityPolicy
type balancerState struct {
endpoints []string
index int
affinity affinityPolicy
} }
func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityPolicy { func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityPolicy {
@ -70,9 +75,7 @@ func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityP
// NewLoadBalancerRR returns a new LoadBalancerRR. // NewLoadBalancerRR returns a new LoadBalancerRR.
func NewLoadBalancerRR() *LoadBalancerRR { func NewLoadBalancerRR() *LoadBalancerRR {
return &LoadBalancerRR{ return &LoadBalancerRR{
endpointsMap: make(map[balancerKey][]string), services: map[balancerKey]*balancerState{},
rrIndex: make(map[balancerKey]int),
serviceDtlMap: make(map[balancerKey]affinityPolicy),
} }
} }
@ -84,19 +87,22 @@ func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityTy
return nil return nil
} }
func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) { func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) *balancerState {
if ttlMinutes == 0 { if ttlMinutes == 0 {
ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead????
} }
if _, exists := lb.serviceDtlMap[balancerKey(service)]; !exists {
lb.serviceDtlMap[balancerKey(service)] = *newAffinityPolicy(affinityType, ttlMinutes) key := balancerKey(service)
glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[balancerKey(service)]) if _, exists := lb.services[key]; !exists {
lb.services[key] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service)
} }
return lb.services[key]
} }
// return true if this service is using some form of session affinity. // return true if this service is using some form of session affinity.
func isSessionAffinity(affinity *affinityPolicy) bool { func isSessionAffinity(affinity *affinityPolicy) bool {
//Should never be empty string, but chekcing for it to be safe. // Should never be empty string, but checking for it to be safe.
if affinity.affinityType == "" || affinity.affinityType == api.AffinityTypeNone { if affinity.affinityType == "" || affinity.affinityType == api.AffinityTypeNone {
return false return false
} }
@ -106,54 +112,57 @@ func isSessionAffinity(affinity *affinityPolicy) bool {
// NextEndpoint returns a service endpoint. // NextEndpoint returns a service endpoint.
// The service endpoint is chosen using the round-robin algorithm. // The service endpoint is chosen using the round-robin algorithm.
func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) { func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) {
var ipaddr string // Coarse locking is simple. We can get more fine-grained if/when we
glog.V(4).Infof("NextEndpoint. service: %s. srcAddr: %+v. Endpoints: %+v", service, srcAddr, lb.endpointsMap) // can prove it matters.
lb.lock.Lock()
defer lb.lock.Unlock()
lb.lock.RLock() key := balancerKey(service)
serviceDtls, exists := lb.serviceDtlMap[balancerKey(service)] state, exists := lb.services[key]
endpoints, _ := lb.endpointsMap[balancerKey(service)] if !exists || state == nil {
index := lb.rrIndex[balancerKey(service)]
sessionAffinityEnabled := isSessionAffinity(&serviceDtls)
lb.lock.RUnlock()
if !exists {
return "", ErrMissingServiceEntry return "", ErrMissingServiceEntry
} }
if len(endpoints) == 0 { if len(state.endpoints) == 0 {
return "", ErrMissingEndpoints return "", ErrMissingEndpoints
} }
glog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", service, srcAddr, state.endpoints)
sessionAffinityEnabled := isSessionAffinity(&state.affinity)
var ipaddr string
if sessionAffinityEnabled { if sessionAffinityEnabled {
if _, _, err := net.SplitHostPort(srcAddr.String()); err == nil { // Caution: don't shadow ipaddr
ipaddr, _, _ = net.SplitHostPort(srcAddr.String()) var err error
ipaddr, _, err = net.SplitHostPort(srcAddr.String())
if err != nil {
return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err)
} }
sessionAffinity, exists := serviceDtls.affinityMap[ipaddr] sessionAffinity, exists := state.affinity.affinityMap[ipaddr]
glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes {
if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < serviceDtls.ttlMinutes { // Affinity wins.
endpoint := sessionAffinity.endpoint endpoint := sessionAffinity.endpoint
sessionAffinity.lastUsed = time.Now() sessionAffinity.lastUsed = time.Now()
glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", service, ipaddr, sessionAffinity, endpoint)
return endpoint, nil return endpoint, nil
} }
} }
endpoint := endpoints[index] // Take the next endpoint.
lb.lock.Lock() endpoint := state.endpoints[state.index]
lb.rrIndex[balancerKey(service)] = (index + 1) % len(endpoints) state.index = (state.index + 1) % len(state.endpoints)
if sessionAffinityEnabled { if sessionAffinityEnabled {
var affinity *affinityState var affinity *affinityState
affinity, _ = lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr] affinity = state.affinity.affinityMap[ipaddr]
if affinity == nil { if affinity == nil {
affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()} affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()}
lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr] = affinity state.affinity.affinityMap[ipaddr] = affinity
} }
affinity.lastUsed = time.Now() affinity.lastUsed = time.Now()
affinity.endpoint = endpoint affinity.endpoint = endpoint
affinity.clientIP = ipaddr affinity.clientIP = ipaddr
glog.V(4).Infof("Updated affinity key %s: %+v", ipaddr, state.affinity.affinityMap[ipaddr])
glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr])
} }
lb.lock.Unlock()
return endpoint, nil return endpoint, nil
} }
@ -174,12 +183,12 @@ func filterValidEndpoints(endpoints []api.Endpoint) []string {
return result return result
} }
//remove any session affinity records associated to a particular endpoint (for example when a pod goes down). // Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, endpoint string) { func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey, endpoint string) {
for _, affinity := range lb.serviceDtlMap[service].affinityMap { for _, affinity := range state.affinity.affinityMap {
if affinity.endpoint == endpoint { if affinity.endpoint == endpoint {
glog.V(4).Infof("Removing client: %s from affinityMap for service: %s", affinity.endpoint, service) glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, service)
delete(lb.serviceDtlMap[service].affinityMap, affinity.clientIP) delete(state.affinity.affinityMap, affinity.clientIP)
} }
} }
} }
@ -187,19 +196,22 @@ func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, en
// Loop through the valid endpoints and then the endpoints associated with the Load Balancer. // Loop through the valid endpoints and then the endpoints associated with the Load Balancer.
// Then remove any session affinity records that are not in both lists. // Then remove any session affinity records that are not in both lists.
// This assumes the lb.lock is held. // This assumes the lb.lock is held.
func updateAffinityMap(lb *LoadBalancerRR, service balancerKey, newEndpoints []string) { func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []string) {
allEndpoints := map[string]int{} allEndpoints := map[string]int{}
for _, validEndpoint := range newEndpoints { for _, newEndpoint := range newEndpoints {
allEndpoints[validEndpoint] = 1 allEndpoints[newEndpoint] = 1
} }
for _, existingEndpoint := range lb.endpointsMap[service] { state, exists := lb.services[service]
if !exists {
return
}
for _, existingEndpoint := range state.endpoints {
allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1 allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1
} }
for mKey, mVal := range allEndpoints { for mKey, mVal := range allEndpoints {
if mVal == 1 { if mVal == 1 {
glog.V(3).Infof("Delete endpoint %s for service: %s", mKey, service) glog.V(3).Infof("Delete endpoint %s for service %q", mKey, service)
removeSessionAffinityByEndpoint(lb, service, mKey) removeSessionAffinityByEndpoint(state, service, mKey)
delete(lb.serviceDtlMap[service].affinityMap, mKey)
} }
} }
} }
@ -211,43 +223,64 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
registeredEndpoints := make(map[balancerKey]bool) registeredEndpoints := make(map[balancerKey]bool)
lb.lock.Lock() lb.lock.Lock()
defer lb.lock.Unlock() defer lb.lock.Unlock()
// Update endpoints for services. // Update endpoints for services.
for _, svcEndpoints := range allEndpoints { for _, svcEndpoints := range allEndpoints {
curEndpoints, exists := lb.endpointsMap[balancerKey(svcEndpoints.Name)] key := balancerKey(svcEndpoints.Name)
state, exists := lb.services[key]
curEndpoints := []string{}
if state != nil {
curEndpoints = state.endpoints
}
newEndpoints := filterValidEndpoints(svcEndpoints.Endpoints) newEndpoints := filterValidEndpoints(svcEndpoints.Endpoints)
if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(curEndpoints)), slice.SortStrings(newEndpoints)) { if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints) glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints)
updateAffinityMap(lb, balancerKey(svcEndpoints.Name), newEndpoints) lb.updateAffinityMap(key, newEndpoints)
// On update can be called without NewService being called externally. // On update can be called without NewService being called externally.
// to be safe we will call it here. A new service will only be created // To be safe we will call it here. A new service will only be created
// if one does not already exist. // if one does not already exist.
lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0) state = lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0)
lb.endpointsMap[balancerKey(svcEndpoints.Name)] = slice.ShuffleStrings(newEndpoints) state.endpoints = slice.ShuffleStrings(newEndpoints)
// Reset the round-robin index. // Reset the round-robin index.
lb.rrIndex[balancerKey(svcEndpoints.Name)] = 0 state.index = 0
} }
registeredEndpoints[balancerKey(svcEndpoints.Name)] = true registeredEndpoints[key] = true
} }
// Remove endpoints missing from the update. // Remove endpoints missing from the update.
for k, v := range lb.endpointsMap { for k := range lb.services {
if _, exists := registeredEndpoints[k]; !exists { if _, exists := registeredEndpoints[k]; !exists {
glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s -> %+v", k, v) glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s", k)
delete(lb.endpointsMap, k) delete(lb.services, k)
delete(lb.serviceDtlMap, k)
} }
} }
} }
// Tests whether two slices are equivalent. This sorts both slices in-place.
func slicesEquiv(lhs, rhs []string) bool {
if len(lhs) != len(rhs) {
return false
}
if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) {
return true
}
return false
}
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) {
lb.lock.Lock() lb.lock.Lock()
defer lb.lock.Unlock() defer lb.lock.Unlock()
ttlMinutes := lb.serviceDtlMap[balancerKey(service)].ttlMinutes key := balancerKey(service)
for ip, affinity := range lb.serviceDtlMap[balancerKey(service)].affinityMap { state, exists := lb.services[key]
if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= ttlMinutes { if !exists {
glog.V(4).Infof("Removing client: %s from affinityMap for service: %s. Last used is greater than %d minutes....", affinity.clientIP, service, ttlMinutes) glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service)
delete(lb.serviceDtlMap[balancerKey(service)].affinityMap, ip) return
}
for ip, affinity := range state.affinity.affinityMap {
if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= state.affinity.ttlMinutes {
glog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, service)
delete(state.affinity.affinityMap, ip)
} }
} }
} }

View File

@ -119,7 +119,7 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"] shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil)
@ -142,7 +142,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"] shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil)
@ -157,7 +157,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"] shuffledEndpoints = loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
@ -195,14 +195,14 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] shuffledFooEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil)
shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] shuffledBarEndpoints := loadBalancer.services["bar"].endpoints
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
@ -265,7 +265,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"] shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
@ -297,7 +297,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"] shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2)
@ -332,7 +332,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"] shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
client1Endpoint := shuffledEndpoints[0] client1Endpoint := shuffledEndpoints[0]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
@ -350,7 +350,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"] shuffledEndpoints = loadBalancer.services["foo"].endpoints
if client1Endpoint == "endpoint:3" { if client1Endpoint == "endpoint:3" {
client1Endpoint = shuffledEndpoints[0] client1Endpoint = shuffledEndpoints[0]
} else if client2Endpoint == "endpoint:3" { } else if client2Endpoint == "endpoint:3" {
@ -371,7 +371,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"] shuffledEndpoints = loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3)
@ -401,7 +401,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"] shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
@ -418,7 +418,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"] shuffledEndpoints = loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
@ -463,7 +463,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] shuffledFooEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3)
@ -471,7 +471,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] shuffledBarEndpoints := loadBalancer.services["bar"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
@ -487,7 +487,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
} }
// but bar is still there, and we continue RR from where we left off. // but bar is still there, and we continue RR from where we left off.
shuffledBarEndpoints = loadBalancer.endpointsMap["bar"] shuffledBarEndpoints = loadBalancer.services["bar"].endpoints
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)