Merge pull request #4707 from thockin/balancer

Refactor RR load balancer, fix bug
This commit is contained in:
Daniel Smith 2015-02-23 13:56:40 -08:00
commit 3dc8b7ac69
2 changed files with 162 additions and 116 deletions

View File

@ -18,6 +18,7 @@ package proxy
import ( import (
"errors" "errors"
"fmt"
"net" "net"
"reflect" "reflect"
"strconv" "strconv"
@ -34,62 +35,76 @@ var (
ErrMissingEndpoints = errors.New("missing endpoints") ErrMissingEndpoints = errors.New("missing endpoints")
) )
type sessionAffinityDetail struct { type affinityState struct {
clientIPAddress string clientIP string
//clientProtocol api.Protocol //not yet used //clientProtocol api.Protocol //not yet used
//sessionCookie string //not yet used //sessionCookie string //not yet used
endpoint string endpoint string
lastUsedDTTM time.Time lastUsed time.Time
} }
type serviceDetail struct { type affinityPolicy struct {
name string affinityType api.AffinityType
sessionAffinityType api.AffinityType affinityMap map[string]*affinityState // map client IP -> affinity info
sessionAffinityMap map[string]*sessionAffinityDetail ttlMinutes int
stickyMaxAgeMinutes int
} }
// balancerKey is a string that the balancer uses to key stored state.
type balancerKey string
// LoadBalancerRR is a round-robin load balancer. // LoadBalancerRR is a round-robin load balancer.
type LoadBalancerRR struct { type LoadBalancerRR struct {
lock sync.RWMutex lock sync.RWMutex
endpointsMap map[string][]string services map[balancerKey]*balancerState
rrIndex map[string]int
serviceDtlMap map[string]serviceDetail
} }
func newServiceDetail(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) *serviceDetail { type balancerState struct {
return &serviceDetail{ endpoints []string
name: service, index int
sessionAffinityType: sessionAffinityType, affinity affinityPolicy
sessionAffinityMap: make(map[string]*sessionAffinityDetail), }
stickyMaxAgeMinutes: stickyMaxAgeMinutes,
func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityPolicy {
return &affinityPolicy{
affinityType: affinityType,
affinityMap: make(map[string]*affinityState),
ttlMinutes: ttlMinutes,
} }
} }
// NewLoadBalancerRR returns a new LoadBalancerRR. // NewLoadBalancerRR returns a new LoadBalancerRR.
func NewLoadBalancerRR() *LoadBalancerRR { func NewLoadBalancerRR() *LoadBalancerRR {
return &LoadBalancerRR{ return &LoadBalancerRR{
endpointsMap: make(map[string][]string), services: map[balancerKey]*balancerState{},
rrIndex: make(map[string]int),
serviceDtlMap: make(map[string]serviceDetail),
} }
} }
func (lb *LoadBalancerRR) NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error { func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error {
if stickyMaxAgeMinutes == 0 { lb.lock.Lock()
stickyMaxAgeMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? defer lb.lock.Unlock()
}
if _, exists := lb.serviceDtlMap[service]; !exists { lb.newServiceInternal(service, affinityType, ttlMinutes)
lb.serviceDtlMap[service] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes)
glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[service])
}
return nil return nil
} }
// return true if this service detail is using some form of session affinity. // This assumes that lb.lock is already held.
func isSessionAffinity(serviceDtl serviceDetail) bool { func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) *balancerState {
//Should never be empty string, but chekcing for it to be safe. if ttlMinutes == 0 {
if serviceDtl.sessionAffinityType == "" || serviceDtl.sessionAffinityType == api.AffinityTypeNone { ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead????
}
key := balancerKey(service)
if _, exists := lb.services[key]; !exists {
lb.services[key] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service)
}
return lb.services[key]
}
// return true if this service is using some form of session affinity.
func isSessionAffinity(affinity *affinityPolicy) bool {
// Should never be empty string, but checking for it to be safe.
if affinity.affinityType == "" || affinity.affinityType == api.AffinityTypeNone {
return false return false
} }
return true return true
@ -98,54 +113,57 @@ func isSessionAffinity(serviceDtl serviceDetail) bool {
// NextEndpoint returns a service endpoint. // NextEndpoint returns a service endpoint.
// The service endpoint is chosen using the round-robin algorithm. // The service endpoint is chosen using the round-robin algorithm.
func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) { func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) {
var ipaddr string // Coarse locking is simple. We can get more fine-grained if/when we
glog.V(4).Infof("NextEndpoint. service: %s. srcAddr: %+v. Endpoints: %+v", service, srcAddr, lb.endpointsMap) // can prove it matters.
lb.lock.Lock()
defer lb.lock.Unlock()
lb.lock.RLock() key := balancerKey(service)
serviceDtls, exists := lb.serviceDtlMap[service] state, exists := lb.services[key]
endpoints, _ := lb.endpointsMap[service] if !exists || state == nil {
index := lb.rrIndex[service]
sessionAffinityEnabled := isSessionAffinity(serviceDtls)
lb.lock.RUnlock()
if !exists {
return "", ErrMissingServiceEntry return "", ErrMissingServiceEntry
} }
if len(endpoints) == 0 { if len(state.endpoints) == 0 {
return "", ErrMissingEndpoints return "", ErrMissingEndpoints
} }
glog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", service, srcAddr, state.endpoints)
sessionAffinityEnabled := isSessionAffinity(&state.affinity)
var ipaddr string
if sessionAffinityEnabled { if sessionAffinityEnabled {
if _, _, err := net.SplitHostPort(srcAddr.String()); err == nil { // Caution: don't shadow ipaddr
ipaddr, _, _ = net.SplitHostPort(srcAddr.String()) var err error
ipaddr, _, err = net.SplitHostPort(srcAddr.String())
if err != nil {
return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err)
} }
sessionAffinity, exists := serviceDtls.sessionAffinityMap[ipaddr] sessionAffinity, exists := state.affinity.affinityMap[ipaddr]
glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes {
if exists && int(time.Now().Sub(sessionAffinity.lastUsedDTTM).Minutes()) < serviceDtls.stickyMaxAgeMinutes { // Affinity wins.
endpoint := sessionAffinity.endpoint endpoint := sessionAffinity.endpoint
sessionAffinity.lastUsedDTTM = time.Now() sessionAffinity.lastUsed = time.Now()
glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", service, ipaddr, sessionAffinity, endpoint)
return endpoint, nil return endpoint, nil
} }
} }
endpoint := endpoints[index] // Take the next endpoint.
lb.lock.Lock() endpoint := state.endpoints[state.index]
lb.rrIndex[service] = (index + 1) % len(endpoints) state.index = (state.index + 1) % len(state.endpoints)
if sessionAffinityEnabled { if sessionAffinityEnabled {
var affinity *sessionAffinityDetail var affinity *affinityState
affinity, _ = lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] affinity = state.affinity.affinityMap[ipaddr]
if affinity == nil { if affinity == nil {
affinity = new(sessionAffinityDetail) //&sessionAffinityDetail{ipaddr, "TCP", "", endpoint, time.Now()} affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()}
lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] = affinity state.affinity.affinityMap[ipaddr] = affinity
} }
affinity.lastUsedDTTM = time.Now() affinity.lastUsed = time.Now()
affinity.endpoint = endpoint affinity.endpoint = endpoint
affinity.clientIPAddress = ipaddr affinity.clientIP = ipaddr
glog.V(4).Infof("Updated affinity key %s: %+v", ipaddr, state.affinity.affinityMap[ipaddr])
glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[service].sessionAffinityMap[ipaddr])
} }
lb.lock.Unlock()
return endpoint, nil return endpoint, nil
} }
@ -166,31 +184,35 @@ func filterValidEndpoints(endpoints []api.Endpoint) []string {
return result return result
} }
//remove any session affinity records associated to a particular endpoint (for example when a pod goes down). // Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service string, endpoint string) { func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey, endpoint string) {
for _, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { for _, affinity := range state.affinity.affinityMap {
if affinityDetail.endpoint == endpoint { if affinity.endpoint == endpoint {
glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s", affinityDetail.endpoint, service) glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, service)
delete(lb.serviceDtlMap[service].sessionAffinityMap, affinityDetail.clientIPAddress) delete(state.affinity.affinityMap, affinity.clientIP)
} }
} }
} }
//Loop through the valid endpoints and then the endpoints associated with the Load Balancer. // Loop through the valid endpoints and then the endpoints associated with the Load Balancer.
// Then remove any session affinity records that are not in both lists. // Then remove any session affinity records that are not in both lists.
func updateServiceDetailMap(lb *LoadBalancerRR, service string, validEndpoints []string) { // This assumes the lb.lock is held.
func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []string) {
allEndpoints := map[string]int{} allEndpoints := map[string]int{}
for _, validEndpoint := range validEndpoints { for _, newEndpoint := range newEndpoints {
allEndpoints[validEndpoint] = 1 allEndpoints[newEndpoint] = 1
} }
for _, existingEndpoint := range lb.endpointsMap[service] { state, exists := lb.services[service]
if !exists {
return
}
for _, existingEndpoint := range state.endpoints {
allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1 allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1
} }
for mKey, mVal := range allEndpoints { for mKey, mVal := range allEndpoints {
if mVal == 1 { if mVal == 1 {
glog.V(3).Infof("Delete endpoint %s for service: %s", mKey, service) glog.V(3).Infof("Delete endpoint %s for service %q", mKey, service)
removeSessionAffinityByEndpoint(lb, service, mKey) removeSessionAffinityByEndpoint(state, service, mKey)
delete(lb.serviceDtlMap[service].sessionAffinityMap, mKey)
} }
} }
} }
@ -198,44 +220,68 @@ func updateServiceDetailMap(lb *LoadBalancerRR, service string, validEndpoints [
// OnUpdate manages the registered service endpoints. // OnUpdate manages the registered service endpoints.
// Registered endpoints are updated if found in the update set or // Registered endpoints are updated if found in the update set or
// unregistered if missing from the update set. // unregistered if missing from the update set.
func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
registeredEndpoints := make(map[string]bool) registeredEndpoints := make(map[balancerKey]bool)
lb.lock.Lock() lb.lock.Lock()
defer lb.lock.Unlock() defer lb.lock.Unlock()
// Update endpoints for services. // Update endpoints for services.
for _, endpoint := range endpoints { for _, svcEndpoints := range allEndpoints {
existingEndpoints, exists := lb.endpointsMap[endpoint.Name] key := balancerKey(svcEndpoints.Name)
validEndpoints := filterValidEndpoints(endpoint.Endpoints) state, exists := lb.services[key]
if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(existingEndpoints)), slice.SortStrings(validEndpoints)) { curEndpoints := []string{}
glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", endpoint.Name, endpoint.Endpoints) if state != nil {
updateServiceDetailMap(lb, endpoint.Name, validEndpoints) curEndpoints = state.endpoints
}
newEndpoints := filterValidEndpoints(svcEndpoints.Endpoints)
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints)
lb.updateAffinityMap(key, newEndpoints)
// On update can be called without NewService being called externally. // On update can be called without NewService being called externally.
// to be safe we will call it here. A new service will only be created // To be safe we will call it here. A new service will only be created
// if one does not already exist. // if one does not already exist.
lb.NewService(endpoint.Name, api.AffinityTypeNone, 0) state = lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0)
lb.endpointsMap[endpoint.Name] = slice.ShuffleStrings(validEndpoints) state.endpoints = slice.ShuffleStrings(newEndpoints)
// Reset the round-robin index. // Reset the round-robin index.
lb.rrIndex[endpoint.Name] = 0 state.index = 0
} }
registeredEndpoints[endpoint.Name] = true registeredEndpoints[key] = true
} }
// Remove endpoints missing from the update. // Remove endpoints missing from the update.
for k, v := range lb.endpointsMap { for k := range lb.services {
if _, exists := registeredEndpoints[k]; !exists { if _, exists := registeredEndpoints[k]; !exists {
glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s -> %+v", k, v) glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s", k)
delete(lb.endpointsMap, k) delete(lb.services, k)
delete(lb.serviceDtlMap, k)
} }
} }
} }
// Tests whether two slices are equivalent. This sorts both slices in-place.
func slicesEquiv(lhs, rhs []string) bool {
if len(lhs) != len(rhs) {
return false
}
if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) {
return true
}
return false
}
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) {
stickyMaxAgeMinutes := lb.serviceDtlMap[service].stickyMaxAgeMinutes lb.lock.Lock()
for key, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { defer lb.lock.Unlock()
if int(time.Now().Sub(affinityDetail.lastUsedDTTM).Minutes()) >= stickyMaxAgeMinutes {
glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s. Last used is greater than %d minutes....", affinityDetail.clientIPAddress, service, stickyMaxAgeMinutes) key := balancerKey(service)
delete(lb.serviceDtlMap[service].sessionAffinityMap, key) state, exists := lb.services[key]
if !exists {
glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service)
return
}
for ip, affinity := range state.affinity.affinityMap {
if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= state.affinity.ttlMinutes {
glog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, service)
delete(state.affinity.affinityMap, ip)
} }
} }
} }

View File

@ -119,7 +119,7 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"] shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil)
@ -142,7 +142,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"] shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil)
@ -157,7 +157,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"] shuffledEndpoints = loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
@ -195,14 +195,14 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] shuffledFooEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil)
shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] shuffledBarEndpoints := loadBalancer.services["bar"].endpoints
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
@ -265,7 +265,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"] shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
@ -297,7 +297,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"] shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2)
@ -332,7 +332,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"] shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
client1Endpoint := shuffledEndpoints[0] client1Endpoint := shuffledEndpoints[0]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
@ -350,7 +350,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"] shuffledEndpoints = loadBalancer.services["foo"].endpoints
if client1Endpoint == "endpoint:3" { if client1Endpoint == "endpoint:3" {
client1Endpoint = shuffledEndpoints[0] client1Endpoint = shuffledEndpoints[0]
} else if client2Endpoint == "endpoint:3" { } else if client2Endpoint == "endpoint:3" {
@ -371,7 +371,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"] shuffledEndpoints = loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3)
@ -401,7 +401,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.endpointsMap["foo"] shuffledEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
@ -418,7 +418,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.endpointsMap["foo"] shuffledEndpoints = loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
@ -463,7 +463,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
}, },
} }
loadBalancer.OnUpdate(endpoints) loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] shuffledFooEndpoints := loadBalancer.services["foo"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3)
@ -471,7 +471,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] shuffledBarEndpoints := loadBalancer.services["bar"].endpoints
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
@ -487,7 +487,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
} }
// but bar is still there, and we continue RR from where we left off. // but bar is still there, and we continue RR from where we left off.
shuffledBarEndpoints = loadBalancer.endpointsMap["bar"] shuffledBarEndpoints = loadBalancer.services["bar"].endpoints
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)