mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Fix unprotected shared state in load balancer
This commit is contained in:
parent
e88134f736
commit
8503c34694
@ -77,6 +77,15 @@ func NewLoadBalancerRR() *LoadBalancerRR {
|
||||
}
|
||||
|
||||
func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error {
|
||||
lb.lock.Lock()
|
||||
defer lb.lock.Unlock()
|
||||
|
||||
lb.newServiceInternal(service, affinityType, ttlMinutes)
|
||||
return nil
|
||||
}
|
||||
|
||||
// This assumes that lb.lock is already held.
|
||||
func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) {
|
||||
if ttlMinutes == 0 {
|
||||
ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead????
|
||||
}
|
||||
@ -84,7 +93,6 @@ func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityTy
|
||||
lb.serviceDtlMap[balancerKey(service)] = *newAffinityPolicy(affinityType, ttlMinutes)
|
||||
glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[balancerKey(service)])
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// return true if this service is using some form of session affinity.
|
||||
@ -177,8 +185,9 @@ func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service balancerKey, en
|
||||
}
|
||||
}
|
||||
|
||||
//Loop through the valid endpoints and then the endpoints associated with the Load Balancer.
|
||||
// Then remove any session affinity records that are not in both lists.
|
||||
// Loop through the valid endpoints and then the endpoints associated with the Load Balancer.
|
||||
// Then remove any session affinity records that are not in both lists.
|
||||
// This assumes the lb.lock is held.
|
||||
func updateAffinityMap(lb *LoadBalancerRR, service balancerKey, newEndpoints []string) {
|
||||
allEndpoints := map[string]int{}
|
||||
for _, validEndpoint := range newEndpoints {
|
||||
@ -213,7 +222,7 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
|
||||
// On update can be called without NewService being called externally.
|
||||
// to be safe we will call it here. A new service will only be created
|
||||
// if one does not already exist.
|
||||
lb.NewService(svcEndpoints.Name, api.AffinityTypeNone, 0)
|
||||
lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0)
|
||||
lb.endpointsMap[balancerKey(svcEndpoints.Name)] = slice.ShuffleStrings(newEndpoints)
|
||||
|
||||
// Reset the round-robin index.
|
||||
@ -232,6 +241,9 @@ func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
|
||||
}
|
||||
|
||||
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) {
|
||||
lb.lock.Lock()
|
||||
defer lb.lock.Unlock()
|
||||
|
||||
ttlMinutes := lb.serviceDtlMap[balancerKey(service)].ttlMinutes
|
||||
for ip, affinity := range lb.serviceDtlMap[balancerKey(service)].affinityMap {
|
||||
if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= ttlMinutes {
|
||||
|
Loading…
Reference in New Issue
Block a user