diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index 070c82ee70f..cc3926478d0 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -49,12 +49,15 @@ type serviceDetail struct { stickyMaxAgeMinutes int } +// balancerKey is a string that the balancer uses to key stored state. +type balancerKey string + // LoadBalancerRR is a round-robin load balancer. type LoadBalancerRR struct { lock sync.RWMutex - endpointsMap map[string][]string - rrIndex map[string]int - serviceDtlMap map[string]serviceDetail + endpointsMap map[balancerKey][]string + rrIndex map[balancerKey]int + serviceDtlMap map[balancerKey]serviceDetail } func newServiceDetail(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) *serviceDetail { @@ -69,9 +72,9 @@ func newServiceDetail(service string, sessionAffinityType api.AffinityType, stic // NewLoadBalancerRR returns a new LoadBalancerRR. func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{ - endpointsMap: make(map[string][]string), - rrIndex: make(map[string]int), - serviceDtlMap: make(map[string]serviceDetail), + endpointsMap: make(map[balancerKey][]string), + rrIndex: make(map[balancerKey]int), + serviceDtlMap: make(map[balancerKey]serviceDetail), } } @@ -79,9 +82,9 @@ func (lb *LoadBalancerRR) NewService(service string, sessionAffinityType api.Aff if stickyMaxAgeMinutes == 0 { stickyMaxAgeMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? } - if _, exists := lb.serviceDtlMap[service]; !exists { - lb.serviceDtlMap[service] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes) - glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[service]) + if _, exists := lb.serviceDtlMap[balancerKey(service)]; !exists { + lb.serviceDtlMap[balancerKey(service)] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes) + glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[balancerKey(service)]) } return nil } @@ -102,9 +105,9 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string glog.V(4).Infof("NextEndpoint. service: %s. srcAddr: %+v. Endpoints: %+v", service, srcAddr, lb.endpointsMap) lb.lock.RLock() - serviceDtls, exists := lb.serviceDtlMap[service] - endpoints, _ := lb.endpointsMap[service] - index := lb.rrIndex[service] + serviceDtls, exists := lb.serviceDtlMap[balancerKey(service)] + endpoints, _ := lb.endpointsMap[balancerKey(service)] + index := lb.rrIndex[balancerKey(service)] sessionAffinityEnabled := isSessionAffinity(serviceDtls) lb.lock.RUnlock() @@ -129,20 +132,20 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string } endpoint := endpoints[index] lb.lock.Lock() - lb.rrIndex[service] = (index + 1) % len(endpoints) + lb.rrIndex[balancerKey(service)] = (index + 1) % len(endpoints) if sessionAffinityEnabled { var affinity *sessionAffinityDetail - affinity, _ = lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] + affinity, _ = lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr] if affinity == nil { affinity = new(sessionAffinityDetail) //&sessionAffinityDetail{ipaddr, "TCP", "", endpoint, time.Now()} - lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] = affinity + lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr] = affinity } affinity.lastUsedDTTM = time.Now() affinity.endpoint = endpoint affinity.clientIPAddress = ipaddr - glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[service].sessionAffinityMap[ipaddr]) + glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr]) } lb.lock.Unlock() @@ -167,7 +170,7 @@ func filterValidEndpoints(endpoints []api.Endpoint) []string { } //remove any session affinity records associated to a particular endpoint (for example when a pod goes down). -func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service string, endpoint string) { +func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, endpoint string) { for _, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { if affinityDetail.endpoint == endpoint { glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s", affinityDetail.endpoint, service) @@ -178,7 +181,7 @@ func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service string, endpoin //Loop through the valid endpoints and then the endpoints associated with the Load Balancer. // Then remove any session affinity records that are not in both lists. -func updateServiceDetailMap(lb *LoadBalancerRR, service string, validEndpoints []string) { +func updateServiceDetailMap(lb *LoadBalancerRR, service balancerKey, validEndpoints []string) { allEndpoints := map[string]int{} for _, validEndpoint := range validEndpoints { allEndpoints[validEndpoint] = 1 @@ -199,26 +202,26 @@ func updateServiceDetailMap(lb *LoadBalancerRR, service string, validEndpoints [ // Registered endpoints are updated if found in the update set or // unregistered if missing from the update set. func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { - registeredEndpoints := make(map[string]bool) + registeredEndpoints := make(map[balancerKey]bool) lb.lock.Lock() defer lb.lock.Unlock() // Update endpoints for services. for _, endpoint := range endpoints { - existingEndpoints, exists := lb.endpointsMap[endpoint.Name] + existingEndpoints, exists := lb.endpointsMap[balancerKey(endpoint.Name)] validEndpoints := filterValidEndpoints(endpoint.Endpoints) if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(existingEndpoints)), slice.SortStrings(validEndpoints)) { glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", endpoint.Name, endpoint.Endpoints) - updateServiceDetailMap(lb, endpoint.Name, validEndpoints) + updateServiceDetailMap(lb, balancerKey(endpoint.Name), validEndpoints) // On update can be called without NewService being called externally. // to be safe we will call it here. A new service will only be created // if one does not already exist. lb.NewService(endpoint.Name, api.AffinityTypeNone, 0) - lb.endpointsMap[endpoint.Name] = slice.ShuffleStrings(validEndpoints) + lb.endpointsMap[balancerKey(endpoint.Name)] = slice.ShuffleStrings(validEndpoints) // Reset the round-robin index. - lb.rrIndex[endpoint.Name] = 0 + lb.rrIndex[balancerKey(endpoint.Name)] = 0 } - registeredEndpoints[endpoint.Name] = true + registeredEndpoints[balancerKey(endpoint.Name)] = true } // Remove endpoints missing from the update. for k, v := range lb.endpointsMap { @@ -231,11 +234,11 @@ func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { } func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { - stickyMaxAgeMinutes := lb.serviceDtlMap[service].stickyMaxAgeMinutes - for key, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { + stickyMaxAgeMinutes := lb.serviceDtlMap[balancerKey(service)].stickyMaxAgeMinutes + for key, affinityDetail := range lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap { if int(time.Now().Sub(affinityDetail.lastUsedDTTM).Minutes()) >= stickyMaxAgeMinutes { glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s. Last used is greater than %d minutes....", affinityDetail.clientIPAddress, service, stickyMaxAgeMinutes) - delete(lb.serviceDtlMap[service].sessionAffinityMap, key) + delete(lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap, key) } } }