From e88134f7368a9ca95a803cf3ec2ee917a2c258e5 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Sat, 21 Feb 2015 21:34:50 -0800 Subject: [PATCH] Some renames in round-robin balancer Making a clear path for more service port changes,making this code more comprehensible. --- pkg/proxy/roundrobin.go | 114 ++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 58 deletions(-) diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index cc3926478d0..07d4688f20d 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -34,19 +34,18 @@ var ( ErrMissingEndpoints = errors.New("missing endpoints") ) -type sessionAffinityDetail struct { - clientIPAddress string +type affinityState struct { + clientIP string //clientProtocol api.Protocol //not yet used //sessionCookie string //not yet used - endpoint string - lastUsedDTTM time.Time + endpoint string + lastUsed time.Time } -type serviceDetail struct { - name string - sessionAffinityType api.AffinityType - sessionAffinityMap map[string]*sessionAffinityDetail - stickyMaxAgeMinutes int +type affinityPolicy struct { + affinityType api.AffinityType + affinityMap map[string]*affinityState // map client IP -> affinity info + ttlMinutes int } // balancerKey is a string that the balancer uses to key stored state. @@ -57,15 +56,14 @@ type LoadBalancerRR struct { lock sync.RWMutex endpointsMap map[balancerKey][]string rrIndex map[balancerKey]int - serviceDtlMap map[balancerKey]serviceDetail + serviceDtlMap map[balancerKey]affinityPolicy } -func newServiceDetail(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) *serviceDetail { - return &serviceDetail{ - name: service, - sessionAffinityType: sessionAffinityType, - sessionAffinityMap: make(map[string]*sessionAffinityDetail), - stickyMaxAgeMinutes: stickyMaxAgeMinutes, +func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityPolicy { + return &affinityPolicy{ + affinityType: affinityType, + affinityMap: make(map[string]*affinityState), + ttlMinutes: ttlMinutes, } } @@ -74,25 +72,25 @@ func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{ endpointsMap: make(map[balancerKey][]string), rrIndex: make(map[balancerKey]int), - serviceDtlMap: make(map[balancerKey]serviceDetail), + serviceDtlMap: make(map[balancerKey]affinityPolicy), } } -func (lb *LoadBalancerRR) NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error { - if stickyMaxAgeMinutes == 0 { - stickyMaxAgeMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? +func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error { + if ttlMinutes == 0 { + ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? } if _, exists := lb.serviceDtlMap[balancerKey(service)]; !exists { - lb.serviceDtlMap[balancerKey(service)] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes) + lb.serviceDtlMap[balancerKey(service)] = *newAffinityPolicy(affinityType, ttlMinutes) glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[balancerKey(service)]) } return nil } -// return true if this service detail is using some form of session affinity. -func isSessionAffinity(serviceDtl serviceDetail) bool { +// return true if this service is using some form of session affinity. +func isSessionAffinity(affinity *affinityPolicy) bool { //Should never be empty string, but chekcing for it to be safe. - if serviceDtl.sessionAffinityType == "" || serviceDtl.sessionAffinityType == api.AffinityTypeNone { + if affinity.affinityType == "" || affinity.affinityType == api.AffinityTypeNone { return false } return true @@ -108,7 +106,7 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string serviceDtls, exists := lb.serviceDtlMap[balancerKey(service)] endpoints, _ := lb.endpointsMap[balancerKey(service)] index := lb.rrIndex[balancerKey(service)] - sessionAffinityEnabled := isSessionAffinity(serviceDtls) + sessionAffinityEnabled := isSessionAffinity(&serviceDtls) lb.lock.RUnlock() if !exists { @@ -121,11 +119,11 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string if _, _, err := net.SplitHostPort(srcAddr.String()); err == nil { ipaddr, _, _ = net.SplitHostPort(srcAddr.String()) } - sessionAffinity, exists := serviceDtls.sessionAffinityMap[ipaddr] + sessionAffinity, exists := serviceDtls.affinityMap[ipaddr] glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) - if exists && int(time.Now().Sub(sessionAffinity.lastUsedDTTM).Minutes()) < serviceDtls.stickyMaxAgeMinutes { + if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < serviceDtls.ttlMinutes { endpoint := sessionAffinity.endpoint - sessionAffinity.lastUsedDTTM = time.Now() + sessionAffinity.lastUsed = time.Now() glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) return endpoint, nil } @@ -135,17 +133,17 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string lb.rrIndex[balancerKey(service)] = (index + 1) % len(endpoints) if sessionAffinityEnabled { - var affinity *sessionAffinityDetail - affinity, _ = lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr] + var affinity *affinityState + affinity, _ = lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr] if affinity == nil { - affinity = new(sessionAffinityDetail) //&sessionAffinityDetail{ipaddr, "TCP", "", endpoint, time.Now()} - lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr] = affinity + affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()} + lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr] = affinity } - affinity.lastUsedDTTM = time.Now() + affinity.lastUsed = time.Now() affinity.endpoint = endpoint - affinity.clientIPAddress = ipaddr + affinity.clientIP = ipaddr - glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr]) + glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr]) } lb.lock.Unlock() @@ -171,19 +169,19 @@ func filterValidEndpoints(endpoints []api.Endpoint) []string { //remove any session affinity records associated to a particular endpoint (for example when a pod goes down). func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, endpoint string) { - for _, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { - if affinityDetail.endpoint == endpoint { - glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s", affinityDetail.endpoint, service) - delete(lb.serviceDtlMap[service].sessionAffinityMap, affinityDetail.clientIPAddress) + for _, affinity := range lb.serviceDtlMap[service].affinityMap { + if affinity.endpoint == endpoint { + glog.V(4).Infof("Removing client: %s from affinityMap for service: %s", affinity.endpoint, service) + delete(lb.serviceDtlMap[service].affinityMap, affinity.clientIP) } } } //Loop through the valid endpoints and then the endpoints associated with the Load Balancer. // Then remove any session affinity records that are not in both lists. -func updateServiceDetailMap(lb *LoadBalancerRR, service balancerKey, validEndpoints []string) { +func updateAffinityMap(lb *LoadBalancerRR, service balancerKey, newEndpoints []string) { allEndpoints := map[string]int{} - for _, validEndpoint := range validEndpoints { + for _, validEndpoint := range newEndpoints { allEndpoints[validEndpoint] = 1 } for _, existingEndpoint := range lb.endpointsMap[service] { @@ -193,7 +191,7 @@ func updateServiceDetailMap(lb *LoadBalancerRR, service balancerKey, validEndpoi if mVal == 1 { glog.V(3).Infof("Delete endpoint %s for service: %s", mKey, service) removeSessionAffinityByEndpoint(lb, service, mKey) - delete(lb.serviceDtlMap[service].sessionAffinityMap, mKey) + delete(lb.serviceDtlMap[service].affinityMap, mKey) } } } @@ -201,27 +199,27 @@ func updateServiceDetailMap(lb *LoadBalancerRR, service balancerKey, validEndpoi // OnUpdate manages the registered service endpoints. // Registered endpoints are updated if found in the update set or // unregistered if missing from the update set. -func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { +func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { registeredEndpoints := make(map[balancerKey]bool) lb.lock.Lock() defer lb.lock.Unlock() // Update endpoints for services. - for _, endpoint := range endpoints { - existingEndpoints, exists := lb.endpointsMap[balancerKey(endpoint.Name)] - validEndpoints := filterValidEndpoints(endpoint.Endpoints) - if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(existingEndpoints)), slice.SortStrings(validEndpoints)) { - glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", endpoint.Name, endpoint.Endpoints) - updateServiceDetailMap(lb, balancerKey(endpoint.Name), validEndpoints) + for _, svcEndpoints := range allEndpoints { + curEndpoints, exists := lb.endpointsMap[balancerKey(svcEndpoints.Name)] + newEndpoints := filterValidEndpoints(svcEndpoints.Endpoints) + if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(curEndpoints)), slice.SortStrings(newEndpoints)) { + glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints) + updateAffinityMap(lb, balancerKey(svcEndpoints.Name), newEndpoints) // On update can be called without NewService being called externally. // to be safe we will call it here. A new service will only be created // if one does not already exist. - lb.NewService(endpoint.Name, api.AffinityTypeNone, 0) - lb.endpointsMap[balancerKey(endpoint.Name)] = slice.ShuffleStrings(validEndpoints) + lb.NewService(svcEndpoints.Name, api.AffinityTypeNone, 0) + lb.endpointsMap[balancerKey(svcEndpoints.Name)] = slice.ShuffleStrings(newEndpoints) // Reset the round-robin index. - lb.rrIndex[balancerKey(endpoint.Name)] = 0 + lb.rrIndex[balancerKey(svcEndpoints.Name)] = 0 } - registeredEndpoints[balancerKey(endpoint.Name)] = true + registeredEndpoints[balancerKey(svcEndpoints.Name)] = true } // Remove endpoints missing from the update. for k, v := range lb.endpointsMap { @@ -234,11 +232,11 @@ func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { } func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { - stickyMaxAgeMinutes := lb.serviceDtlMap[balancerKey(service)].stickyMaxAgeMinutes - for key, affinityDetail := range lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap { - if int(time.Now().Sub(affinityDetail.lastUsedDTTM).Minutes()) >= stickyMaxAgeMinutes { - glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s. Last used is greater than %d minutes....", affinityDetail.clientIPAddress, service, stickyMaxAgeMinutes) - delete(lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap, key) + ttlMinutes := lb.serviceDtlMap[balancerKey(service)].ttlMinutes + for ip, affinity := range lb.serviceDtlMap[balancerKey(service)].affinityMap { + if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= ttlMinutes { + glog.V(4).Infof("Removing client: %s from affinityMap for service: %s. Last used is greater than %d minutes....", affinity.clientIP, service, ttlMinutes) + delete(lb.serviceDtlMap[balancerKey(service)].affinityMap, ip) } } }