From 37b8c14a4f8a796e6b680f6de91890299b396e4e Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Sat, 21 Feb 2015 15:30:36 -0800 Subject: [PATCH 1/4] use a strong type for loadbalancer keys --- pkg/proxy/roundrobin.go | 57 ++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index 070c82ee70f..cc3926478d0 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -49,12 +49,15 @@ type serviceDetail struct { stickyMaxAgeMinutes int } +// balancerKey is a string that the balancer uses to key stored state. +type balancerKey string + // LoadBalancerRR is a round-robin load balancer. type LoadBalancerRR struct { lock sync.RWMutex - endpointsMap map[string][]string - rrIndex map[string]int - serviceDtlMap map[string]serviceDetail + endpointsMap map[balancerKey][]string + rrIndex map[balancerKey]int + serviceDtlMap map[balancerKey]serviceDetail } func newServiceDetail(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) *serviceDetail { @@ -69,9 +72,9 @@ func newServiceDetail(service string, sessionAffinityType api.AffinityType, stic // NewLoadBalancerRR returns a new LoadBalancerRR. func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{ - endpointsMap: make(map[string][]string), - rrIndex: make(map[string]int), - serviceDtlMap: make(map[string]serviceDetail), + endpointsMap: make(map[balancerKey][]string), + rrIndex: make(map[balancerKey]int), + serviceDtlMap: make(map[balancerKey]serviceDetail), } } @@ -79,9 +82,9 @@ func (lb *LoadBalancerRR) NewService(service string, sessionAffinityType api.Aff if stickyMaxAgeMinutes == 0 { stickyMaxAgeMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? } - if _, exists := lb.serviceDtlMap[service]; !exists { - lb.serviceDtlMap[service] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes) - glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[service]) + if _, exists := lb.serviceDtlMap[balancerKey(service)]; !exists { + lb.serviceDtlMap[balancerKey(service)] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes) + glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[balancerKey(service)]) } return nil } @@ -102,9 +105,9 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string glog.V(4).Infof("NextEndpoint. service: %s. srcAddr: %+v. Endpoints: %+v", service, srcAddr, lb.endpointsMap) lb.lock.RLock() - serviceDtls, exists := lb.serviceDtlMap[service] - endpoints, _ := lb.endpointsMap[service] - index := lb.rrIndex[service] + serviceDtls, exists := lb.serviceDtlMap[balancerKey(service)] + endpoints, _ := lb.endpointsMap[balancerKey(service)] + index := lb.rrIndex[balancerKey(service)] sessionAffinityEnabled := isSessionAffinity(serviceDtls) lb.lock.RUnlock() @@ -129,20 +132,20 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string } endpoint := endpoints[index] lb.lock.Lock() - lb.rrIndex[service] = (index + 1) % len(endpoints) + lb.rrIndex[balancerKey(service)] = (index + 1) % len(endpoints) if sessionAffinityEnabled { var affinity *sessionAffinityDetail - affinity, _ = lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] + affinity, _ = lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr] if affinity == nil { affinity = new(sessionAffinityDetail) //&sessionAffinityDetail{ipaddr, "TCP", "", endpoint, time.Now()} - lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] = affinity + lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr] = affinity } affinity.lastUsedDTTM = time.Now() affinity.endpoint = endpoint affinity.clientIPAddress = ipaddr - glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[service].sessionAffinityMap[ipaddr]) + glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr]) } lb.lock.Unlock() @@ -167,7 +170,7 @@ func filterValidEndpoints(endpoints []api.Endpoint) []string { } //remove any session affinity records associated to a particular endpoint (for example when a pod goes down). -func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service string, endpoint string) { +func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, endpoint string) { for _, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { if affinityDetail.endpoint == endpoint { glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s", affinityDetail.endpoint, service) @@ -178,7 +181,7 @@ func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service string, endpoin //Loop through the valid endpoints and then the endpoints associated with the Load Balancer. // Then remove any session affinity records that are not in both lists. -func updateServiceDetailMap(lb *LoadBalancerRR, service string, validEndpoints []string) { +func updateServiceDetailMap(lb *LoadBalancerRR, service balancerKey, validEndpoints []string) { allEndpoints := map[string]int{} for _, validEndpoint := range validEndpoints { allEndpoints[validEndpoint] = 1 @@ -199,26 +202,26 @@ func updateServiceDetailMap(lb *LoadBalancerRR, service string, validEndpoints [ // Registered endpoints are updated if found in the update set or // unregistered if missing from the update set. func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { - registeredEndpoints := make(map[string]bool) + registeredEndpoints := make(map[balancerKey]bool) lb.lock.Lock() defer lb.lock.Unlock() // Update endpoints for services. for _, endpoint := range endpoints { - existingEndpoints, exists := lb.endpointsMap[endpoint.Name] + existingEndpoints, exists := lb.endpointsMap[balancerKey(endpoint.Name)] validEndpoints := filterValidEndpoints(endpoint.Endpoints) if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(existingEndpoints)), slice.SortStrings(validEndpoints)) { glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", endpoint.Name, endpoint.Endpoints) - updateServiceDetailMap(lb, endpoint.Name, validEndpoints) + updateServiceDetailMap(lb, balancerKey(endpoint.Name), validEndpoints) // On update can be called without NewService being called externally. // to be safe we will call it here. A new service will only be created // if one does not already exist. lb.NewService(endpoint.Name, api.AffinityTypeNone, 0) - lb.endpointsMap[endpoint.Name] = slice.ShuffleStrings(validEndpoints) + lb.endpointsMap[balancerKey(endpoint.Name)] = slice.ShuffleStrings(validEndpoints) // Reset the round-robin index. - lb.rrIndex[endpoint.Name] = 0 + lb.rrIndex[balancerKey(endpoint.Name)] = 0 } - registeredEndpoints[endpoint.Name] = true + registeredEndpoints[balancerKey(endpoint.Name)] = true } // Remove endpoints missing from the update. for k, v := range lb.endpointsMap { @@ -231,11 +234,11 @@ func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { } func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { - stickyMaxAgeMinutes := lb.serviceDtlMap[service].stickyMaxAgeMinutes - for key, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { + stickyMaxAgeMinutes := lb.serviceDtlMap[balancerKey(service)].stickyMaxAgeMinutes + for key, affinityDetail := range lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap { if int(time.Now().Sub(affinityDetail.lastUsedDTTM).Minutes()) >= stickyMaxAgeMinutes { glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s. Last used is greater than %d minutes....", affinityDetail.clientIPAddress, service, stickyMaxAgeMinutes) - delete(lb.serviceDtlMap[service].sessionAffinityMap, key) + delete(lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap, key) } } } From e88134f7368a9ca95a803cf3ec2ee917a2c258e5 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Sat, 21 Feb 2015 21:34:50 -0800 Subject: [PATCH 2/4] Some renames in round-robin balancer Making a clear path for more service port changes,making this code more comprehensible. --- pkg/proxy/roundrobin.go | 114 ++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 58 deletions(-) diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index cc3926478d0..07d4688f20d 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -34,19 +34,18 @@ var ( ErrMissingEndpoints = errors.New("missing endpoints") ) -type sessionAffinityDetail struct { - clientIPAddress string +type affinityState struct { + clientIP string //clientProtocol api.Protocol //not yet used //sessionCookie string //not yet used - endpoint string - lastUsedDTTM time.Time + endpoint string + lastUsed time.Time } -type serviceDetail struct { - name string - sessionAffinityType api.AffinityType - sessionAffinityMap map[string]*sessionAffinityDetail - stickyMaxAgeMinutes int +type affinityPolicy struct { + affinityType api.AffinityType + affinityMap map[string]*affinityState // map client IP -> affinity info + ttlMinutes int } // balancerKey is a string that the balancer uses to key stored state. @@ -57,15 +56,14 @@ type LoadBalancerRR struct { lock sync.RWMutex endpointsMap map[balancerKey][]string rrIndex map[balancerKey]int - serviceDtlMap map[balancerKey]serviceDetail + serviceDtlMap map[balancerKey]affinityPolicy } -func newServiceDetail(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) *serviceDetail { - return &serviceDetail{ - name: service, - sessionAffinityType: sessionAffinityType, - sessionAffinityMap: make(map[string]*sessionAffinityDetail), - stickyMaxAgeMinutes: stickyMaxAgeMinutes, +func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityPolicy { + return &affinityPolicy{ + affinityType: affinityType, + affinityMap: make(map[string]*affinityState), + ttlMinutes: ttlMinutes, } } @@ -74,25 +72,25 @@ func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{ endpointsMap: make(map[balancerKey][]string), rrIndex: make(map[balancerKey]int), - serviceDtlMap: make(map[balancerKey]serviceDetail), + serviceDtlMap: make(map[balancerKey]affinityPolicy), } } -func (lb *LoadBalancerRR) NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error { - if stickyMaxAgeMinutes == 0 { - stickyMaxAgeMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? +func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error { + if ttlMinutes == 0 { + ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? } if _, exists := lb.serviceDtlMap[balancerKey(service)]; !exists { - lb.serviceDtlMap[balancerKey(service)] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes) + lb.serviceDtlMap[balancerKey(service)] = *newAffinityPolicy(affinityType, ttlMinutes) glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[balancerKey(service)]) } return nil } -// return true if this service detail is using some form of session affinity. -func isSessionAffinity(serviceDtl serviceDetail) bool { +// return true if this service is using some form of session affinity. +func isSessionAffinity(affinity *affinityPolicy) bool { //Should never be empty string, but chekcing for it to be safe. - if serviceDtl.sessionAffinityType == "" || serviceDtl.sessionAffinityType == api.AffinityTypeNone { + if affinity.affinityType == "" || affinity.affinityType == api.AffinityTypeNone { return false } return true @@ -108,7 +106,7 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string serviceDtls, exists := lb.serviceDtlMap[balancerKey(service)] endpoints, _ := lb.endpointsMap[balancerKey(service)] index := lb.rrIndex[balancerKey(service)] - sessionAffinityEnabled := isSessionAffinity(serviceDtls) + sessionAffinityEnabled := isSessionAffinity(&serviceDtls) lb.lock.RUnlock() if !exists { @@ -121,11 +119,11 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string if _, _, err := net.SplitHostPort(srcAddr.String()); err == nil { ipaddr, _, _ = net.SplitHostPort(srcAddr.String()) } - sessionAffinity, exists := serviceDtls.sessionAffinityMap[ipaddr] + sessionAffinity, exists := serviceDtls.affinityMap[ipaddr] glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) - if exists && int(time.Now().Sub(sessionAffinity.lastUsedDTTM).Minutes()) < serviceDtls.stickyMaxAgeMinutes { + if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < serviceDtls.ttlMinutes { endpoint := sessionAffinity.endpoint - sessionAffinity.lastUsedDTTM = time.Now() + sessionAffinity.lastUsed = time.Now() glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) return endpoint, nil } @@ -135,17 +133,17 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string lb.rrIndex[balancerKey(service)] = (index + 1) % len(endpoints) if sessionAffinityEnabled { - var affinity *sessionAffinityDetail - affinity, _ = lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr] + var affinity *affinityState + affinity, _ = lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr] if affinity == nil { - affinity = new(sessionAffinityDetail) //&sessionAffinityDetail{ipaddr, "TCP", "", endpoint, time.Now()} - lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr] = affinity + affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()} + lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr] = affinity } - affinity.lastUsedDTTM = time.Now() + affinity.lastUsed = time.Now() affinity.endpoint = endpoint - affinity.clientIPAddress = ipaddr + affinity.clientIP = ipaddr - glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr]) + glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr]) } lb.lock.Unlock() @@ -171,19 +169,19 @@ func filterValidEndpoints(endpoints []api.Endpoint) []string { //remove any session affinity records associated to a particular endpoint (for example when a pod goes down). func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, endpoint string) { - for _, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { - if affinityDetail.endpoint == endpoint { - glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s", affinityDetail.endpoint, service) - delete(lb.serviceDtlMap[service].sessionAffinityMap, affinityDetail.clientIPAddress) + for _, affinity := range lb.serviceDtlMap[service].affinityMap { + if affinity.endpoint == endpoint { + glog.V(4).Infof("Removing client: %s from affinityMap for service: %s", affinity.endpoint, service) + delete(lb.serviceDtlMap[service].affinityMap, affinity.clientIP) } } } //Loop through the valid endpoints and then the endpoints associated with the Load Balancer. // Then remove any session affinity records that are not in both lists. -func updateServiceDetailMap(lb *LoadBalancerRR, service balancerKey, validEndpoints []string) { +func updateAffinityMap(lb *LoadBalancerRR, service balancerKey, newEndpoints []string) { allEndpoints := map[string]int{} - for _, validEndpoint := range validEndpoints { + for _, validEndpoint := range newEndpoints { allEndpoints[validEndpoint] = 1 } for _, existingEndpoint := range lb.endpointsMap[service] { @@ -193,7 +191,7 @@ func updateServiceDetailMap(lb *LoadBalancerRR, service balancerKey, validEndpoi if mVal == 1 { glog.V(3).Infof("Delete endpoint %s for service: %s", mKey, service) removeSessionAffinityByEndpoint(lb, service, mKey) - delete(lb.serviceDtlMap[service].sessionAffinityMap, mKey) + delete(lb.serviceDtlMap[service].affinityMap, mKey) } } } @@ -201,27 +199,27 @@ func updateServiceDetailMap(lb *LoadBalancerRR, service balancerKey, validEndpoi // OnUpdate manages the registered service endpoints. // Registered endpoints are updated if found in the update set or // unregistered if missing from the update set. -func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { +func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { registeredEndpoints := make(map[balancerKey]bool) lb.lock.Lock() defer lb.lock.Unlock() // Update endpoints for services. - for _, endpoint := range endpoints { - existingEndpoints, exists := lb.endpointsMap[balancerKey(endpoint.Name)] - validEndpoints := filterValidEndpoints(endpoint.Endpoints) - if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(existingEndpoints)), slice.SortStrings(validEndpoints)) { - glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", endpoint.Name, endpoint.Endpoints) - updateServiceDetailMap(lb, balancerKey(endpoint.Name), validEndpoints) + for _, svcEndpoints := range allEndpoints { + curEndpoints, exists := lb.endpointsMap[balancerKey(svcEndpoints.Name)] + newEndpoints := filterValidEndpoints(svcEndpoints.Endpoints) + if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(curEndpoints)), slice.SortStrings(newEndpoints)) { + glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints) + updateAffinityMap(lb, balancerKey(svcEndpoints.Name), newEndpoints) // On update can be called without NewService being called externally. // to be safe we will call it here. A new service will only be created // if one does not already exist. - lb.NewService(endpoint.Name, api.AffinityTypeNone, 0) - lb.endpointsMap[balancerKey(endpoint.Name)] = slice.ShuffleStrings(validEndpoints) + lb.NewService(svcEndpoints.Name, api.AffinityTypeNone, 0) + lb.endpointsMap[balancerKey(svcEndpoints.Name)] = slice.ShuffleStrings(newEndpoints) // Reset the round-robin index. - lb.rrIndex[balancerKey(endpoint.Name)] = 0 + lb.rrIndex[balancerKey(svcEndpoints.Name)] = 0 } - registeredEndpoints[balancerKey(endpoint.Name)] = true + registeredEndpoints[balancerKey(svcEndpoints.Name)] = true } // Remove endpoints missing from the update. for k, v := range lb.endpointsMap { @@ -234,11 +232,11 @@ func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { } func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { - stickyMaxAgeMinutes := lb.serviceDtlMap[balancerKey(service)].stickyMaxAgeMinutes - for key, affinityDetail := range lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap { - if int(time.Now().Sub(affinityDetail.lastUsedDTTM).Minutes()) >= stickyMaxAgeMinutes { - glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s. Last used is greater than %d minutes....", affinityDetail.clientIPAddress, service, stickyMaxAgeMinutes) - delete(lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap, key) + ttlMinutes := lb.serviceDtlMap[balancerKey(service)].ttlMinutes + for ip, affinity := range lb.serviceDtlMap[balancerKey(service)].affinityMap { + if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= ttlMinutes { + glog.V(4).Infof("Removing client: %s from affinityMap for service: %s. Last used is greater than %d minutes....", affinity.clientIP, service, ttlMinutes) + delete(lb.serviceDtlMap[balancerKey(service)].affinityMap, ip) } } } From 8503c3469434ee727999aac7aa69ebb3573a8f63 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Sat, 21 Feb 2015 21:53:40 -0800 Subject: [PATCH 3/4] Fix unprotected shared state in load balancer --- pkg/proxy/roundrobin.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index 07d4688f20d..ada3230fba9 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -77,6 +77,15 @@ func NewLoadBalancerRR() *LoadBalancerRR { } func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error { + lb.lock.Lock() + defer lb.lock.Unlock() + + lb.newServiceInternal(service, affinityType, ttlMinutes) + return nil +} + +// This assumes that lb.lock is already held. +func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) { if ttlMinutes == 0 { ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? } @@ -84,7 +93,6 @@ func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityTy lb.serviceDtlMap[balancerKey(service)] = *newAffinityPolicy(affinityType, ttlMinutes) glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[balancerKey(service)]) } - return nil } // return true if this service is using some form of session affinity. @@ -177,8 +185,9 @@ func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, en } } -//Loop through the valid endpoints and then the endpoints associated with the Load Balancer. -// Then remove any session affinity records that are not in both lists. +// Loop through the valid endpoints and then the endpoints associated with the Load Balancer. +// Then remove any session affinity records that are not in both lists. +// This assumes the lb.lock is held. func updateAffinityMap(lb *LoadBalancerRR, service balancerKey, newEndpoints []string) { allEndpoints := map[string]int{} for _, validEndpoint := range newEndpoints { @@ -213,7 +222,7 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { // On update can be called without NewService being called externally. // to be safe we will call it here. A new service will only be created // if one does not already exist. - lb.NewService(svcEndpoints.Name, api.AffinityTypeNone, 0) + lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0) lb.endpointsMap[balancerKey(svcEndpoints.Name)] = slice.ShuffleStrings(newEndpoints) // Reset the round-robin index. @@ -232,6 +241,9 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { } func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { + lb.lock.Lock() + defer lb.lock.Unlock() + ttlMinutes := lb.serviceDtlMap[balancerKey(service)].ttlMinutes for ip, affinity := range lb.serviceDtlMap[balancerKey(service)].affinityMap { if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= ttlMinutes { From 5dc98968c3c9c195ba46f6fc84661927921326d8 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Sat, 21 Feb 2015 22:19:50 -0800 Subject: [PATCH 4/4] Refactor load balancer. Flatten 3 parallel maps into a map to structs (balancerState). --- pkg/proxy/roundrobin.go | 165 +++++++++++++++++++++-------------- pkg/proxy/roundrobin_test.go | 30 +++---- 2 files changed, 114 insertions(+), 81 deletions(-) diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index ada3230fba9..1afc9f2762c 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -18,6 +18,7 @@ package proxy import ( "errors" + "fmt" "net" "reflect" "strconv" @@ -53,10 +54,14 @@ type balancerKey string // LoadBalancerRR is a round-robin load balancer. type LoadBalancerRR struct { - lock sync.RWMutex - endpointsMap map[balancerKey][]string - rrIndex map[balancerKey]int - serviceDtlMap map[balancerKey]affinityPolicy + lock sync.RWMutex + services map[balancerKey]*balancerState +} + +type balancerState struct { + endpoints []string + index int + affinity affinityPolicy } func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityPolicy { @@ -70,9 +75,7 @@ func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityP // NewLoadBalancerRR returns a new LoadBalancerRR. func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{ - endpointsMap: make(map[balancerKey][]string), - rrIndex: make(map[balancerKey]int), - serviceDtlMap: make(map[balancerKey]affinityPolicy), + services: map[balancerKey]*balancerState{}, } } @@ -85,19 +88,22 @@ func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityTy } // This assumes that lb.lock is already held. -func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) { +func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) *balancerState { if ttlMinutes == 0 { ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? } - if _, exists := lb.serviceDtlMap[balancerKey(service)]; !exists { - lb.serviceDtlMap[balancerKey(service)] = *newAffinityPolicy(affinityType, ttlMinutes) - glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[balancerKey(service)]) + + key := balancerKey(service) + if _, exists := lb.services[key]; !exists { + lb.services[key] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)} + glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service) } + return lb.services[key] } // return true if this service is using some form of session affinity. func isSessionAffinity(affinity *affinityPolicy) bool { - //Should never be empty string, but chekcing for it to be safe. + // Should never be empty string, but checking for it to be safe. if affinity.affinityType == "" || affinity.affinityType == api.AffinityTypeNone { return false } @@ -107,54 +113,57 @@ func isSessionAffinity(affinity *affinityPolicy) bool { // NextEndpoint returns a service endpoint. // The service endpoint is chosen using the round-robin algorithm. func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) { - var ipaddr string - glog.V(4).Infof("NextEndpoint. service: %s. srcAddr: %+v. Endpoints: %+v", service, srcAddr, lb.endpointsMap) + // Coarse locking is simple. We can get more fine-grained if/when we + // can prove it matters. + lb.lock.Lock() + defer lb.lock.Unlock() - lb.lock.RLock() - serviceDtls, exists := lb.serviceDtlMap[balancerKey(service)] - endpoints, _ := lb.endpointsMap[balancerKey(service)] - index := lb.rrIndex[balancerKey(service)] - sessionAffinityEnabled := isSessionAffinity(&serviceDtls) - - lb.lock.RUnlock() - if !exists { + key := balancerKey(service) + state, exists := lb.services[key] + if !exists || state == nil { return "", ErrMissingServiceEntry } - if len(endpoints) == 0 { + if len(state.endpoints) == 0 { return "", ErrMissingEndpoints } + glog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", service, srcAddr, state.endpoints) + + sessionAffinityEnabled := isSessionAffinity(&state.affinity) + + var ipaddr string if sessionAffinityEnabled { - if _, _, err := net.SplitHostPort(srcAddr.String()); err == nil { - ipaddr, _, _ = net.SplitHostPort(srcAddr.String()) + // Caution: don't shadow ipaddr + var err error + ipaddr, _, err = net.SplitHostPort(srcAddr.String()) + if err != nil { + return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err) } - sessionAffinity, exists := serviceDtls.affinityMap[ipaddr] - glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) - if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < serviceDtls.ttlMinutes { + sessionAffinity, exists := state.affinity.affinityMap[ipaddr] + if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes { + // Affinity wins. endpoint := sessionAffinity.endpoint sessionAffinity.lastUsed = time.Now() - glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) + glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", service, ipaddr, sessionAffinity, endpoint) return endpoint, nil } } - endpoint := endpoints[index] - lb.lock.Lock() - lb.rrIndex[balancerKey(service)] = (index + 1) % len(endpoints) + // Take the next endpoint. + endpoint := state.endpoints[state.index] + state.index = (state.index + 1) % len(state.endpoints) if sessionAffinityEnabled { var affinity *affinityState - affinity, _ = lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr] + affinity = state.affinity.affinityMap[ipaddr] if affinity == nil { affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()} - lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr] = affinity + state.affinity.affinityMap[ipaddr] = affinity } affinity.lastUsed = time.Now() affinity.endpoint = endpoint affinity.clientIP = ipaddr - - glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr]) + glog.V(4).Infof("Updated affinity key %s: %+v", ipaddr, state.affinity.affinityMap[ipaddr]) } - lb.lock.Unlock() return endpoint, nil } @@ -175,12 +184,12 @@ func filterValidEndpoints(endpoints []api.Endpoint) []string { return result } -//remove any session affinity records associated to a particular endpoint (for example when a pod goes down). -func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, endpoint string) { - for _, affinity := range lb.serviceDtlMap[service].affinityMap { +// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down). +func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey, endpoint string) { + for _, affinity := range state.affinity.affinityMap { if affinity.endpoint == endpoint { - glog.V(4).Infof("Removing client: %s from affinityMap for service: %s", affinity.endpoint, service) - delete(lb.serviceDtlMap[service].affinityMap, affinity.clientIP) + glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, service) + delete(state.affinity.affinityMap, affinity.clientIP) } } } @@ -188,19 +197,22 @@ func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, en // Loop through the valid endpoints and then the endpoints associated with the Load Balancer. // Then remove any session affinity records that are not in both lists. // This assumes the lb.lock is held. -func updateAffinityMap(lb *LoadBalancerRR, service balancerKey, newEndpoints []string) { +func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []string) { allEndpoints := map[string]int{} - for _, validEndpoint := range newEndpoints { - allEndpoints[validEndpoint] = 1 + for _, newEndpoint := range newEndpoints { + allEndpoints[newEndpoint] = 1 } - for _, existingEndpoint := range lb.endpointsMap[service] { + state, exists := lb.services[service] + if !exists { + return + } + for _, existingEndpoint := range state.endpoints { allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1 } for mKey, mVal := range allEndpoints { if mVal == 1 { - glog.V(3).Infof("Delete endpoint %s for service: %s", mKey, service) - removeSessionAffinityByEndpoint(lb, service, mKey) - delete(lb.serviceDtlMap[service].affinityMap, mKey) + glog.V(3).Infof("Delete endpoint %s for service %q", mKey, service) + removeSessionAffinityByEndpoint(state, service, mKey) } } } @@ -212,43 +224,64 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { registeredEndpoints := make(map[balancerKey]bool) lb.lock.Lock() defer lb.lock.Unlock() + // Update endpoints for services. for _, svcEndpoints := range allEndpoints { - curEndpoints, exists := lb.endpointsMap[balancerKey(svcEndpoints.Name)] + key := balancerKey(svcEndpoints.Name) + state, exists := lb.services[key] + curEndpoints := []string{} + if state != nil { + curEndpoints = state.endpoints + } newEndpoints := filterValidEndpoints(svcEndpoints.Endpoints) - if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(curEndpoints)), slice.SortStrings(newEndpoints)) { + if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints) - updateAffinityMap(lb, balancerKey(svcEndpoints.Name), newEndpoints) + lb.updateAffinityMap(key, newEndpoints) // On update can be called without NewService being called externally. - // to be safe we will call it here. A new service will only be created + // To be safe we will call it here. A new service will only be created // if one does not already exist. - lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0) - lb.endpointsMap[balancerKey(svcEndpoints.Name)] = slice.ShuffleStrings(newEndpoints) + state = lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0) + state.endpoints = slice.ShuffleStrings(newEndpoints) // Reset the round-robin index. - lb.rrIndex[balancerKey(svcEndpoints.Name)] = 0 + state.index = 0 } - registeredEndpoints[balancerKey(svcEndpoints.Name)] = true + registeredEndpoints[key] = true } // Remove endpoints missing from the update. - for k, v := range lb.endpointsMap { + for k := range lb.services { if _, exists := registeredEndpoints[k]; !exists { - glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s -> %+v", k, v) - delete(lb.endpointsMap, k) - delete(lb.serviceDtlMap, k) + glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s", k) + delete(lb.services, k) } } } +// Tests whether two slices are equivalent. This sorts both slices in-place. +func slicesEquiv(lhs, rhs []string) bool { + if len(lhs) != len(rhs) { + return false + } + if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) { + return true + } + return false +} + func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { lb.lock.Lock() defer lb.lock.Unlock() - ttlMinutes := lb.serviceDtlMap[balancerKey(service)].ttlMinutes - for ip, affinity := range lb.serviceDtlMap[balancerKey(service)].affinityMap { - if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= ttlMinutes { - glog.V(4).Infof("Removing client: %s from affinityMap for service: %s. Last used is greater than %d minutes....", affinity.clientIP, service, ttlMinutes) - delete(lb.serviceDtlMap[balancerKey(service)].affinityMap, ip) + key := balancerKey(service) + state, exists := lb.services[key] + if !exists { + glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service) + return + } + for ip, affinity := range state.affinity.affinityMap { + if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= state.affinity.ttlMinutes { + glog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, service) + delete(state.affinity.affinityMap, ip) } } } diff --git a/pkg/proxy/roundrobin_test.go b/pkg/proxy/roundrobin_test.go index 3298aa92bd5..c47e07eab06 100644 --- a/pkg/proxy/roundrobin_test.go +++ b/pkg/proxy/roundrobin_test.go @@ -119,7 +119,7 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) @@ -142,7 +142,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) @@ -157,7 +157,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] + shuffledEndpoints = loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) @@ -195,14 +195,14 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] + shuffledFooEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) - shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] + shuffledBarEndpoints := loadBalancer.services["bar"].endpoints expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) @@ -265,7 +265,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) @@ -297,7 +297,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2) @@ -332,7 +332,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) client1Endpoint := shuffledEndpoints[0] expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) @@ -350,7 +350,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] + shuffledEndpoints = loadBalancer.services["foo"].endpoints if client1Endpoint == "endpoint:3" { client1Endpoint = shuffledEndpoints[0] } else if client2Endpoint == "endpoint:3" { @@ -371,7 +371,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] + shuffledEndpoints = loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) @@ -401,7 +401,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) @@ -418,7 +418,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] + shuffledEndpoints = loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) @@ -463,7 +463,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] + shuffledFooEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3) @@ -471,7 +471,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] + shuffledBarEndpoints := loadBalancer.services["bar"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) @@ -487,7 +487,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { } // but bar is still there, and we continue RR from where we left off. - shuffledBarEndpoints = loadBalancer.endpointsMap["bar"] + shuffledBarEndpoints = loadBalancer.services["bar"].endpoints expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)