mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-10-30 21:30:16 +00:00 
			
		
		
		
	Some renames in round-robin balancer
Making a clear path for more service port changes,making this code more comprehensible.
This commit is contained in:
		| @@ -34,19 +34,18 @@ var ( | |||||||
| 	ErrMissingEndpoints    = errors.New("missing endpoints") | 	ErrMissingEndpoints    = errors.New("missing endpoints") | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type sessionAffinityDetail struct { | type affinityState struct { | ||||||
| 	clientIPAddress string | 	clientIP string | ||||||
| 	//clientProtocol  api.Protocol //not yet used | 	//clientProtocol  api.Protocol //not yet used | ||||||
| 	//sessionCookie   string       //not yet used | 	//sessionCookie   string       //not yet used | ||||||
| 	endpoint     string | 	endpoint string | ||||||
| 	lastUsedDTTM time.Time | 	lastUsed time.Time | ||||||
| } | } | ||||||
|  |  | ||||||
| type serviceDetail struct { | type affinityPolicy struct { | ||||||
| 	name                string | 	affinityType api.AffinityType | ||||||
| 	sessionAffinityType api.AffinityType | 	affinityMap  map[string]*affinityState // map client IP -> affinity info | ||||||
| 	sessionAffinityMap  map[string]*sessionAffinityDetail | 	ttlMinutes   int | ||||||
| 	stickyMaxAgeMinutes int |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // balancerKey is a string that the balancer uses to key stored state. | // balancerKey is a string that the balancer uses to key stored state. | ||||||
| @@ -57,15 +56,14 @@ type LoadBalancerRR struct { | |||||||
| 	lock          sync.RWMutex | 	lock          sync.RWMutex | ||||||
| 	endpointsMap  map[balancerKey][]string | 	endpointsMap  map[balancerKey][]string | ||||||
| 	rrIndex       map[balancerKey]int | 	rrIndex       map[balancerKey]int | ||||||
| 	serviceDtlMap map[balancerKey]serviceDetail | 	serviceDtlMap map[balancerKey]affinityPolicy | ||||||
| } | } | ||||||
|  |  | ||||||
| func newServiceDetail(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) *serviceDetail { | func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityPolicy { | ||||||
| 	return &serviceDetail{ | 	return &affinityPolicy{ | ||||||
| 		name:                service, | 		affinityType: affinityType, | ||||||
| 		sessionAffinityType: sessionAffinityType, | 		affinityMap:  make(map[string]*affinityState), | ||||||
| 		sessionAffinityMap:  make(map[string]*sessionAffinityDetail), | 		ttlMinutes:   ttlMinutes, | ||||||
| 		stickyMaxAgeMinutes: stickyMaxAgeMinutes, |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -74,25 +72,25 @@ func NewLoadBalancerRR() *LoadBalancerRR { | |||||||
| 	return &LoadBalancerRR{ | 	return &LoadBalancerRR{ | ||||||
| 		endpointsMap:  make(map[balancerKey][]string), | 		endpointsMap:  make(map[balancerKey][]string), | ||||||
| 		rrIndex:       make(map[balancerKey]int), | 		rrIndex:       make(map[balancerKey]int), | ||||||
| 		serviceDtlMap: make(map[balancerKey]serviceDetail), | 		serviceDtlMap: make(map[balancerKey]affinityPolicy), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (lb *LoadBalancerRR) NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error { | func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error { | ||||||
| 	if stickyMaxAgeMinutes == 0 { | 	if ttlMinutes == 0 { | ||||||
| 		stickyMaxAgeMinutes = 180 //default to 3 hours if not specified.  Should 0 be unlimeted instead???? | 		ttlMinutes = 180 //default to 3 hours if not specified.  Should 0 be unlimeted instead???? | ||||||
| 	} | 	} | ||||||
| 	if _, exists := lb.serviceDtlMap[balancerKey(service)]; !exists { | 	if _, exists := lb.serviceDtlMap[balancerKey(service)]; !exists { | ||||||
| 		lb.serviceDtlMap[balancerKey(service)] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes) | 		lb.serviceDtlMap[balancerKey(service)] = *newAffinityPolicy(affinityType, ttlMinutes) | ||||||
| 		glog.V(4).Infof("NewService.  Service does not exist.  So I created it: %+v", lb.serviceDtlMap[balancerKey(service)]) | 		glog.V(4).Infof("NewService.  Service does not exist.  So I created it: %+v", lb.serviceDtlMap[balancerKey(service)]) | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // return true if this service detail is using some form of session affinity. | // return true if this service is using some form of session affinity. | ||||||
| func isSessionAffinity(serviceDtl serviceDetail) bool { | func isSessionAffinity(affinity *affinityPolicy) bool { | ||||||
| 	//Should never be empty string, but chekcing for it to be safe. | 	//Should never be empty string, but chekcing for it to be safe. | ||||||
| 	if serviceDtl.sessionAffinityType == "" || serviceDtl.sessionAffinityType == api.AffinityTypeNone { | 	if affinity.affinityType == "" || affinity.affinityType == api.AffinityTypeNone { | ||||||
| 		return false | 		return false | ||||||
| 	} | 	} | ||||||
| 	return true | 	return true | ||||||
| @@ -108,7 +106,7 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string | |||||||
| 	serviceDtls, exists := lb.serviceDtlMap[balancerKey(service)] | 	serviceDtls, exists := lb.serviceDtlMap[balancerKey(service)] | ||||||
| 	endpoints, _ := lb.endpointsMap[balancerKey(service)] | 	endpoints, _ := lb.endpointsMap[balancerKey(service)] | ||||||
| 	index := lb.rrIndex[balancerKey(service)] | 	index := lb.rrIndex[balancerKey(service)] | ||||||
| 	sessionAffinityEnabled := isSessionAffinity(serviceDtls) | 	sessionAffinityEnabled := isSessionAffinity(&serviceDtls) | ||||||
|  |  | ||||||
| 	lb.lock.RUnlock() | 	lb.lock.RUnlock() | ||||||
| 	if !exists { | 	if !exists { | ||||||
| @@ -121,11 +119,11 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string | |||||||
| 		if _, _, err := net.SplitHostPort(srcAddr.String()); err == nil { | 		if _, _, err := net.SplitHostPort(srcAddr.String()); err == nil { | ||||||
| 			ipaddr, _, _ = net.SplitHostPort(srcAddr.String()) | 			ipaddr, _, _ = net.SplitHostPort(srcAddr.String()) | ||||||
| 		} | 		} | ||||||
| 		sessionAffinity, exists := serviceDtls.sessionAffinityMap[ipaddr] | 		sessionAffinity, exists := serviceDtls.affinityMap[ipaddr] | ||||||
| 		glog.V(4).Infof("NextEndpoint.  Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) | 		glog.V(4).Infof("NextEndpoint.  Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) | ||||||
| 		if exists && int(time.Now().Sub(sessionAffinity.lastUsedDTTM).Minutes()) < serviceDtls.stickyMaxAgeMinutes { | 		if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < serviceDtls.ttlMinutes { | ||||||
| 			endpoint := sessionAffinity.endpoint | 			endpoint := sessionAffinity.endpoint | ||||||
| 			sessionAffinity.lastUsedDTTM = time.Now() | 			sessionAffinity.lastUsed = time.Now() | ||||||
| 			glog.V(4).Infof("NextEndpoint.  Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) | 			glog.V(4).Infof("NextEndpoint.  Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) | ||||||
| 			return endpoint, nil | 			return endpoint, nil | ||||||
| 		} | 		} | ||||||
| @@ -135,17 +133,17 @@ func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string | |||||||
| 	lb.rrIndex[balancerKey(service)] = (index + 1) % len(endpoints) | 	lb.rrIndex[balancerKey(service)] = (index + 1) % len(endpoints) | ||||||
|  |  | ||||||
| 	if sessionAffinityEnabled { | 	if sessionAffinityEnabled { | ||||||
| 		var affinity *sessionAffinityDetail | 		var affinity *affinityState | ||||||
| 		affinity, _ = lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr] | 		affinity, _ = lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr] | ||||||
| 		if affinity == nil { | 		if affinity == nil { | ||||||
| 			affinity = new(sessionAffinityDetail) //&sessionAffinityDetail{ipaddr, "TCP", "", endpoint, time.Now()} | 			affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()} | ||||||
| 			lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr] = affinity | 			lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr] = affinity | ||||||
| 		} | 		} | ||||||
| 		affinity.lastUsedDTTM = time.Now() | 		affinity.lastUsed = time.Now() | ||||||
| 		affinity.endpoint = endpoint | 		affinity.endpoint = endpoint | ||||||
| 		affinity.clientIPAddress = ipaddr | 		affinity.clientIP = ipaddr | ||||||
|  |  | ||||||
| 		glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap[ipaddr]) | 		glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[balancerKey(service)].affinityMap[ipaddr]) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	lb.lock.Unlock() | 	lb.lock.Unlock() | ||||||
| @@ -171,19 +169,19 @@ func filterValidEndpoints(endpoints []api.Endpoint) []string { | |||||||
|  |  | ||||||
| //remove any session affinity records associated to a particular endpoint (for example when a pod goes down). | //remove any session affinity records associated to a particular endpoint (for example when a pod goes down). | ||||||
| func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, endpoint string) { | func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, endpoint string) { | ||||||
| 	for _, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { | 	for _, affinity := range lb.serviceDtlMap[service].affinityMap { | ||||||
| 		if affinityDetail.endpoint == endpoint { | 		if affinity.endpoint == endpoint { | ||||||
| 			glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s", affinityDetail.endpoint, service) | 			glog.V(4).Infof("Removing client: %s from affinityMap for service: %s", affinity.endpoint, service) | ||||||
| 			delete(lb.serviceDtlMap[service].sessionAffinityMap, affinityDetail.clientIPAddress) | 			delete(lb.serviceDtlMap[service].affinityMap, affinity.clientIP) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| //Loop through the valid endpoints and then the endpoints associated with the Load Balancer. | //Loop through the valid endpoints and then the endpoints associated with the Load Balancer. | ||||||
| // 	Then remove any session affinity records that are not in both lists. | // 	Then remove any session affinity records that are not in both lists. | ||||||
| func updateServiceDetailMap(lb *LoadBalancerRR, service balancerKey, validEndpoints []string) { | func updateAffinityMap(lb *LoadBalancerRR, service balancerKey, newEndpoints []string) { | ||||||
| 	allEndpoints := map[string]int{} | 	allEndpoints := map[string]int{} | ||||||
| 	for _, validEndpoint := range validEndpoints { | 	for _, validEndpoint := range newEndpoints { | ||||||
| 		allEndpoints[validEndpoint] = 1 | 		allEndpoints[validEndpoint] = 1 | ||||||
| 	} | 	} | ||||||
| 	for _, existingEndpoint := range lb.endpointsMap[service] { | 	for _, existingEndpoint := range lb.endpointsMap[service] { | ||||||
| @@ -193,7 +191,7 @@ func updateServiceDetailMap(lb *LoadBalancerRR, service balancerKey, validEndpoi | |||||||
| 		if mVal == 1 { | 		if mVal == 1 { | ||||||
| 			glog.V(3).Infof("Delete endpoint %s for service: %s", mKey, service) | 			glog.V(3).Infof("Delete endpoint %s for service: %s", mKey, service) | ||||||
| 			removeSessionAffinityByEndpoint(lb, service, mKey) | 			removeSessionAffinityByEndpoint(lb, service, mKey) | ||||||
| 			delete(lb.serviceDtlMap[service].sessionAffinityMap, mKey) | 			delete(lb.serviceDtlMap[service].affinityMap, mKey) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| @@ -201,27 +199,27 @@ func updateServiceDetailMap(lb *LoadBalancerRR, service balancerKey, validEndpoi | |||||||
| // OnUpdate manages the registered service endpoints. | // OnUpdate manages the registered service endpoints. | ||||||
| // Registered endpoints are updated if found in the update set or | // Registered endpoints are updated if found in the update set or | ||||||
| // unregistered if missing from the update set. | // unregistered if missing from the update set. | ||||||
| func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { | func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { | ||||||
| 	registeredEndpoints := make(map[balancerKey]bool) | 	registeredEndpoints := make(map[balancerKey]bool) | ||||||
| 	lb.lock.Lock() | 	lb.lock.Lock() | ||||||
| 	defer lb.lock.Unlock() | 	defer lb.lock.Unlock() | ||||||
| 	// Update endpoints for services. | 	// Update endpoints for services. | ||||||
| 	for _, endpoint := range endpoints { | 	for _, svcEndpoints := range allEndpoints { | ||||||
| 		existingEndpoints, exists := lb.endpointsMap[balancerKey(endpoint.Name)] | 		curEndpoints, exists := lb.endpointsMap[balancerKey(svcEndpoints.Name)] | ||||||
| 		validEndpoints := filterValidEndpoints(endpoint.Endpoints) | 		newEndpoints := filterValidEndpoints(svcEndpoints.Endpoints) | ||||||
| 		if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(existingEndpoints)), slice.SortStrings(validEndpoints)) { | 		if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(curEndpoints)), slice.SortStrings(newEndpoints)) { | ||||||
| 			glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", endpoint.Name, endpoint.Endpoints) | 			glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints) | ||||||
| 			updateServiceDetailMap(lb, balancerKey(endpoint.Name), validEndpoints) | 			updateAffinityMap(lb, balancerKey(svcEndpoints.Name), newEndpoints) | ||||||
| 			// On update can be called without NewService being called externally. | 			// On update can be called without NewService being called externally. | ||||||
| 			// to be safe we will call it here.  A new service will only be created | 			// to be safe we will call it here.  A new service will only be created | ||||||
| 			// if one does not already exist. | 			// if one does not already exist. | ||||||
| 			lb.NewService(endpoint.Name, api.AffinityTypeNone, 0) | 			lb.NewService(svcEndpoints.Name, api.AffinityTypeNone, 0) | ||||||
| 			lb.endpointsMap[balancerKey(endpoint.Name)] = slice.ShuffleStrings(validEndpoints) | 			lb.endpointsMap[balancerKey(svcEndpoints.Name)] = slice.ShuffleStrings(newEndpoints) | ||||||
|  |  | ||||||
| 			// Reset the round-robin index. | 			// Reset the round-robin index. | ||||||
| 			lb.rrIndex[balancerKey(endpoint.Name)] = 0 | 			lb.rrIndex[balancerKey(svcEndpoints.Name)] = 0 | ||||||
| 		} | 		} | ||||||
| 		registeredEndpoints[balancerKey(endpoint.Name)] = true | 		registeredEndpoints[balancerKey(svcEndpoints.Name)] = true | ||||||
| 	} | 	} | ||||||
| 	// Remove endpoints missing from the update. | 	// Remove endpoints missing from the update. | ||||||
| 	for k, v := range lb.endpointsMap { | 	for k, v := range lb.endpointsMap { | ||||||
| @@ -234,11 +232,11 @@ func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { | func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { | ||||||
| 	stickyMaxAgeMinutes := lb.serviceDtlMap[balancerKey(service)].stickyMaxAgeMinutes | 	ttlMinutes := lb.serviceDtlMap[balancerKey(service)].ttlMinutes | ||||||
| 	for key, affinityDetail := range lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap { | 	for ip, affinity := range lb.serviceDtlMap[balancerKey(service)].affinityMap { | ||||||
| 		if int(time.Now().Sub(affinityDetail.lastUsedDTTM).Minutes()) >= stickyMaxAgeMinutes { | 		if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= ttlMinutes { | ||||||
| 			glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s.  Last used is greater than %d minutes....", affinityDetail.clientIPAddress, service, stickyMaxAgeMinutes) | 			glog.V(4).Infof("Removing client: %s from affinityMap for service: %s.  Last used is greater than %d minutes....", affinity.clientIP, service, ttlMinutes) | ||||||
| 			delete(lb.serviceDtlMap[balancerKey(service)].sessionAffinityMap, key) | 			delete(lb.serviceDtlMap[balancerKey(service)].affinityMap, ip) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user