From 362106d8eb1cf93f24315a57679ace6fc1b3f68a Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Sat, 21 Feb 2015 22:19:50 -0800 Subject: [PATCH] Refactor load balancer. Flatten 3 parallel maps into a map to structs (balancerState). --- pkg/proxy/roundrobin.go | 165 +++++++++++++++++++++-------------- pkg/proxy/roundrobin_test.go | 30 +++---- 2 files changed, 114 insertions(+), 81 deletions(-) diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index 6fc974fbded..9af7cc990e2 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -18,6 +18,7 @@ package proxy import ( "errors" + "fmt" "net" "reflect" "strconv" @@ -53,10 +54,14 @@ type balancerKey string // LoadBalancerRR is a round-robin load balancer. type LoadBalancerRR struct { - lock sync.RWMutex - endpointsMap map[balancerKey][]string - rrIndex map[balancerKey]int - serviceDtlMap map[balancerKey]affinityPolicy + lock sync.RWMutex + services map[balancerKey]*balancerState +} + +type balancerState struct { + endpoints []string + index int + affinity affinityPolicy } func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityPolicy { @@ -70,9 +75,7 @@ func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityP // NewLoadBalancerRR returns a new LoadBalancerRR. func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{ - endpointsMap: make(map[balancerKey][]string), - rrIndex: make(map[balancerKey]int), - serviceDtlMap: make(map[balancerKey]affinityPolicy), + services: map[balancerKey]*balancerState{}, } } @@ -84,19 +87,22 @@ func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityTy return nil } -func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) { +func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) *balancerState { if ttlMinutes == 0 { ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? } - if _, exists := lb.serviceDtlMap[balancerKey(service)]; !exists { - lb.serviceDtlMap[balancerKey(service)] = *newAffinityPolicy(affinityType, ttlMinutes) - glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[balancerKey(service)]) + + key := balancerKey(service) + if _, exists := lb.services[key]; !exists { + lb.services[key] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)} + glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service) } + return lb.services[key] } // return true if this service is using some form of session affinity. func isSessionAffinity(affinity *affinityPolicy) bool { - //Should never be empty string, but chekcing for it to be safe. + // Should never be empty string, but checking for it to be safe. if affinity.affinityType == "" || affinity.affinityType == api.AffinityTypeNone { return false } @@ -106,54 +112,57 @@ func isSessionAffinity(affinity *affinityPolicy) bool { // NextEndpoint returns a service endpoint. // The service endpoint is chosen using the round-robin algorithm. func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) { - var ipaddr string - glog.V(4).Infof("NextEndpoint. service: %s. srcAddr: %+v. Endpoints: %+v", service, srcAddr, lb.endpointsMap) + // Coarse locking is simple. We can get more fine-grained if/when we + // can prove it matters. + lb.lock.Lock() + defer lb.lock.Unlock() - lb.lock.RLock() - serviceDtls, exists := lb.serviceDtlMap[balancerKey(service)] - endpoints, _ := lb.endpointsMap[balancerKey(service)] - index := lb.rrIndex[balancerKey(service)] - sessionAffinityEnabled := isSessionAffinity(&serviceDtls) - - lb.lock.RUnlock() - if !exists { + key := balancerKey(service) + state, exists := lb.services[key] + if !exists || state == nil { return "", ErrMissingServiceEntry } - if len(endpoints) == 0 { + if len(state.endpoints) == 0 { return "", ErrMissingEndpoints } + glog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", service, srcAddr, state.endpoints) + + sessionAffinityEnabled := isSessionAffinity(&state.affinity) + + var ipaddr string if sessionAffinityEnabled { - if _, _, err := net.SplitHostPort(srcAddr.String()); err == nil { - ipaddr, _, _ = net.SplitHostPort(srcAddr.String()) + // Caution: don't shadow ipaddr + var err error + ipaddr, _, err = net.SplitHostPort(srcAddr.String()) + if err != nil { + return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err) } - sessionAffinity, exists := serviceDtls.affinityMap[ipaddr] - glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) - if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < serviceDtls.ttlMinutes { + sessionAffinity, exists := state.affinity.affinityMap[ipaddr] + if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes { + // Affinity wins. endpoint := sessionAffinity.endpoint sessionAffinity.lastUsed = time.Now() - glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) + glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", service, ipaddr, sessionAffinity, endpoint) return endpoint, nil } } - endpoint := endpoints[index] - lb.lock.Lock() - lb.rrIndex[balancerKey(service)] = (index + 1) % len(endpoints) + // Take the next endpoint. + endpoint := state.endpoints[state.index] + state.index = (state.index + 1) % len(state.endpoints) if sessionAffinityEnabled { var affinity *affinityState - affinity, _ = lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr] + affinity = state.affinity.affinityMap[ipaddr] if affinity == nil { affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()} - lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr] = affinity + state.affinity.affinityMap[ipaddr] = affinity } affinity.lastUsed = time.Now() affinity.endpoint = endpoint affinity.clientIP = ipaddr - - glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr]) + glog.V(4).Infof("Updated affinity key %s: %+v", ipaddr, state.affinity.affinityMap[ipaddr]) } - lb.lock.Unlock() return endpoint, nil } @@ -174,12 +183,12 @@ func filterValidEndpoints(endpoints []api.Endpoint) []string { return result } -//remove any session affinity records associated to a particular endpoint (for example when a pod goes down). -func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, endpoint string) { - for _, affinity := range lb.serviceDtlMap[service].affinityMap { +// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down). +func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey, endpoint string) { + for _, affinity := range state.affinity.affinityMap { if affinity.endpoint == endpoint { - glog.V(4).Infof("Removing client: %s from affinityMap for service: %s", affinity.endpoint, service) - delete(lb.serviceDtlMap[service].affinityMap, affinity.clientIP) + glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, service) + delete(state.affinity.affinityMap, affinity.clientIP) } } } @@ -187,19 +196,22 @@ func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, en // Loop through the valid endpoints and then the endpoints associated with the Load Balancer. // Then remove any session affinity records that are not in both lists. // This assumes the lb.lock is held. -func updateAffinityMap(lb *LoadBalancerRR, service balancerKey, newEndpoints []string) { +func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []string) { allEndpoints := map[string]int{} - for _, validEndpoint := range newEndpoints { - allEndpoints[validEndpoint] = 1 + for _, newEndpoint := range newEndpoints { + allEndpoints[newEndpoint] = 1 } - for _, existingEndpoint := range lb.endpointsMap[service] { + state, exists := lb.services[service] + if !exists { + return + } + for _, existingEndpoint := range state.endpoints { allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1 } for mKey, mVal := range allEndpoints { if mVal == 1 { - glog.V(3).Infof("Delete endpoint %s for service: %s", mKey, service) - removeSessionAffinityByEndpoint(lb, service, mKey) - delete(lb.serviceDtlMap[service].affinityMap, mKey) + glog.V(3).Infof("Delete endpoint %s for service %q", mKey, service) + removeSessionAffinityByEndpoint(state, service, mKey) } } } @@ -211,43 +223,64 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { registeredEndpoints := make(map[balancerKey]bool) lb.lock.Lock() defer lb.lock.Unlock() + // Update endpoints for services. for _, svcEndpoints := range allEndpoints { - curEndpoints, exists := lb.endpointsMap[balancerKey(svcEndpoints.Name)] + key := balancerKey(svcEndpoints.Name) + state, exists := lb.services[key] + curEndpoints := []string{} + if state != nil { + curEndpoints = state.endpoints + } newEndpoints := filterValidEndpoints(svcEndpoints.Endpoints) - if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(curEndpoints)), slice.SortStrings(newEndpoints)) { + if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints) - updateAffinityMap(lb, balancerKey(svcEndpoints.Name), newEndpoints) + lb.updateAffinityMap(key, newEndpoints) // On update can be called without NewService being called externally. - // to be safe we will call it here. A new service will only be created + // To be safe we will call it here. A new service will only be created // if one does not already exist. - lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0) - lb.endpointsMap[balancerKey(svcEndpoints.Name)] = slice.ShuffleStrings(newEndpoints) + state = lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0) + state.endpoints = slice.ShuffleStrings(newEndpoints) // Reset the round-robin index. - lb.rrIndex[balancerKey(svcEndpoints.Name)] = 0 + state.index = 0 } - registeredEndpoints[balancerKey(svcEndpoints.Name)] = true + registeredEndpoints[key] = true } // Remove endpoints missing from the update. - for k, v := range lb.endpointsMap { + for k := range lb.services { if _, exists := registeredEndpoints[k]; !exists { - glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s -> %+v", k, v) - delete(lb.endpointsMap, k) - delete(lb.serviceDtlMap, k) + glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s", k) + delete(lb.services, k) } } } +// Tests whether two slices are equivalent. This sorts both slices in-place. +func slicesEquiv(lhs, rhs []string) bool { + if len(lhs) != len(rhs) { + return false + } + if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) { + return true + } + return false +} + func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { lb.lock.Lock() defer lb.lock.Unlock() - ttlMinutes := lb.serviceDtlMap[balancerKey(service)].ttlMinutes - for ip, affinity := range lb.serviceDtlMap[balancerKey(service)].affinityMap { - if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= ttlMinutes { - glog.V(4).Infof("Removing client: %s from affinityMap for service: %s. Last used is greater than %d minutes....", affinity.clientIP, service, ttlMinutes) - delete(lb.serviceDtlMap[balancerKey(service)].affinityMap, ip) + key := balancerKey(service) + state, exists := lb.services[key] + if !exists { + glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service) + return + } + for ip, affinity := range state.affinity.affinityMap { + if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= state.affinity.ttlMinutes { + glog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, service) + delete(state.affinity.affinityMap, ip) } } } diff --git a/pkg/proxy/roundrobin_test.go b/pkg/proxy/roundrobin_test.go index 3298aa92bd5..c47e07eab06 100644 --- a/pkg/proxy/roundrobin_test.go +++ b/pkg/proxy/roundrobin_test.go @@ -119,7 +119,7 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) @@ -142,7 +142,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) @@ -157,7 +157,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] + shuffledEndpoints = loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) @@ -195,14 +195,14 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] + shuffledFooEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) - shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] + shuffledBarEndpoints := loadBalancer.services["bar"].endpoints expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) @@ -265,7 +265,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) @@ -297,7 +297,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2) @@ -332,7 +332,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) client1Endpoint := shuffledEndpoints[0] expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) @@ -350,7 +350,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] + shuffledEndpoints = loadBalancer.services["foo"].endpoints if client1Endpoint == "endpoint:3" { client1Endpoint = shuffledEndpoints[0] } else if client2Endpoint == "endpoint:3" { @@ -371,7 +371,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] + shuffledEndpoints = loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) @@ -401,7 +401,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) @@ -418,7 +418,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] + shuffledEndpoints = loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) @@ -463,7 +463,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] + shuffledFooEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3) @@ -471,7 +471,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] + shuffledBarEndpoints := loadBalancer.services["bar"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) @@ -487,7 +487,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { } // but bar is still there, and we continue RR from where we left off. - shuffledBarEndpoints = loadBalancer.endpointsMap["bar"] + shuffledBarEndpoints = loadBalancer.services["bar"].endpoints expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)