diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index 070c82ee70f..1afc9f2762c 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -18,6 +18,7 @@ package proxy import ( "errors" + "fmt" "net" "reflect" "strconv" @@ -34,62 +35,76 @@ var ( ErrMissingEndpoints = errors.New("missing endpoints") ) -type sessionAffinityDetail struct { - clientIPAddress string +type affinityState struct { + clientIP string //clientProtocol api.Protocol //not yet used //sessionCookie string //not yet used - endpoint string - lastUsedDTTM time.Time + endpoint string + lastUsed time.Time } -type serviceDetail struct { - name string - sessionAffinityType api.AffinityType - sessionAffinityMap map[string]*sessionAffinityDetail - stickyMaxAgeMinutes int +type affinityPolicy struct { + affinityType api.AffinityType + affinityMap map[string]*affinityState // map client IP -> affinity info + ttlMinutes int } +// balancerKey is a string that the balancer uses to key stored state. +type balancerKey string + // LoadBalancerRR is a round-robin load balancer. type LoadBalancerRR struct { - lock sync.RWMutex - endpointsMap map[string][]string - rrIndex map[string]int - serviceDtlMap map[string]serviceDetail + lock sync.RWMutex + services map[balancerKey]*balancerState } -func newServiceDetail(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) *serviceDetail { - return &serviceDetail{ - name: service, - sessionAffinityType: sessionAffinityType, - sessionAffinityMap: make(map[string]*sessionAffinityDetail), - stickyMaxAgeMinutes: stickyMaxAgeMinutes, +type balancerState struct { + endpoints []string + index int + affinity affinityPolicy +} + +func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityPolicy { + return &affinityPolicy{ + affinityType: affinityType, + affinityMap: make(map[string]*affinityState), + ttlMinutes: ttlMinutes, } } // NewLoadBalancerRR returns a new LoadBalancerRR. func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{ - endpointsMap: make(map[string][]string), - rrIndex: make(map[string]int), - serviceDtlMap: make(map[string]serviceDetail), + services: map[balancerKey]*balancerState{}, } } -func (lb *LoadBalancerRR) NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error { - if stickyMaxAgeMinutes == 0 { - stickyMaxAgeMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? - } - if _, exists := lb.serviceDtlMap[service]; !exists { - lb.serviceDtlMap[service] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes) - glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[service]) - } +func (lb *LoadBalancerRR) NewService(service string, affinityType api.AffinityType, ttlMinutes int) error { + lb.lock.Lock() + defer lb.lock.Unlock() + + lb.newServiceInternal(service, affinityType, ttlMinutes) return nil } -// return true if this service detail is using some form of session affinity. -func isSessionAffinity(serviceDtl serviceDetail) bool { - //Should never be empty string, but chekcing for it to be safe. - if serviceDtl.sessionAffinityType == "" || serviceDtl.sessionAffinityType == api.AffinityTypeNone { +// This assumes that lb.lock is already held. +func (lb *LoadBalancerRR) newServiceInternal(service string, affinityType api.AffinityType, ttlMinutes int) *balancerState { + if ttlMinutes == 0 { + ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? + } + + key := balancerKey(service) + if _, exists := lb.services[key]; !exists { + lb.services[key] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)} + glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service) + } + return lb.services[key] +} + +// return true if this service is using some form of session affinity. +func isSessionAffinity(affinity *affinityPolicy) bool { + // Should never be empty string, but checking for it to be safe. + if affinity.affinityType == "" || affinity.affinityType == api.AffinityTypeNone { return false } return true @@ -98,54 +113,57 @@ func isSessionAffinity(serviceDtl serviceDetail) bool { // NextEndpoint returns a service endpoint. // The service endpoint is chosen using the round-robin algorithm. func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) { - var ipaddr string - glog.V(4).Infof("NextEndpoint. service: %s. srcAddr: %+v. Endpoints: %+v", service, srcAddr, lb.endpointsMap) + // Coarse locking is simple. We can get more fine-grained if/when we + // can prove it matters. + lb.lock.Lock() + defer lb.lock.Unlock() - lb.lock.RLock() - serviceDtls, exists := lb.serviceDtlMap[service] - endpoints, _ := lb.endpointsMap[service] - index := lb.rrIndex[service] - sessionAffinityEnabled := isSessionAffinity(serviceDtls) - - lb.lock.RUnlock() - if !exists { + key := balancerKey(service) + state, exists := lb.services[key] + if !exists || state == nil { return "", ErrMissingServiceEntry } - if len(endpoints) == 0 { + if len(state.endpoints) == 0 { return "", ErrMissingEndpoints } + glog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", service, srcAddr, state.endpoints) + + sessionAffinityEnabled := isSessionAffinity(&state.affinity) + + var ipaddr string if sessionAffinityEnabled { - if _, _, err := net.SplitHostPort(srcAddr.String()); err == nil { - ipaddr, _, _ = net.SplitHostPort(srcAddr.String()) + // Caution: don't shadow ipaddr + var err error + ipaddr, _, err = net.SplitHostPort(srcAddr.String()) + if err != nil { + return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err) } - sessionAffinity, exists := serviceDtls.sessionAffinityMap[ipaddr] - glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) - if exists && int(time.Now().Sub(sessionAffinity.lastUsedDTTM).Minutes()) < serviceDtls.stickyMaxAgeMinutes { + sessionAffinity, exists := state.affinity.affinityMap[ipaddr] + if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes { + // Affinity wins. endpoint := sessionAffinity.endpoint - sessionAffinity.lastUsedDTTM = time.Now() - glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) + sessionAffinity.lastUsed = time.Now() + glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", service, ipaddr, sessionAffinity, endpoint) return endpoint, nil } } - endpoint := endpoints[index] - lb.lock.Lock() - lb.rrIndex[service] = (index + 1) % len(endpoints) + // Take the next endpoint. + endpoint := state.endpoints[state.index] + state.index = (state.index + 1) % len(state.endpoints) if sessionAffinityEnabled { - var affinity *sessionAffinityDetail - affinity, _ = lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] + var affinity *affinityState + affinity = state.affinity.affinityMap[ipaddr] if affinity == nil { - affinity = new(sessionAffinityDetail) //&sessionAffinityDetail{ipaddr, "TCP", "", endpoint, time.Now()} - lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] = affinity + affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()} + state.affinity.affinityMap[ipaddr] = affinity } - affinity.lastUsedDTTM = time.Now() + affinity.lastUsed = time.Now() affinity.endpoint = endpoint - affinity.clientIPAddress = ipaddr - - glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[service].sessionAffinityMap[ipaddr]) + affinity.clientIP = ipaddr + glog.V(4).Infof("Updated affinity key %s: %+v", ipaddr, state.affinity.affinityMap[ipaddr]) } - lb.lock.Unlock() return endpoint, nil } @@ -166,31 +184,35 @@ func filterValidEndpoints(endpoints []api.Endpoint) []string { return result } -//remove any session affinity records associated to a particular endpoint (for example when a pod goes down). -func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service string, endpoint string) { - for _, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { - if affinityDetail.endpoint == endpoint { - glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s", affinityDetail.endpoint, service) - delete(lb.serviceDtlMap[service].sessionAffinityMap, affinityDetail.clientIPAddress) +// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down). +func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey, endpoint string) { + for _, affinity := range state.affinity.affinityMap { + if affinity.endpoint == endpoint { + glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, service) + delete(state.affinity.affinityMap, affinity.clientIP) } } } -//Loop through the valid endpoints and then the endpoints associated with the Load Balancer. -// Then remove any session affinity records that are not in both lists. -func updateServiceDetailMap(lb *LoadBalancerRR, service string, validEndpoints []string) { +// Loop through the valid endpoints and then the endpoints associated with the Load Balancer. +// Then remove any session affinity records that are not in both lists. +// This assumes the lb.lock is held. +func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []string) { allEndpoints := map[string]int{} - for _, validEndpoint := range validEndpoints { - allEndpoints[validEndpoint] = 1 + for _, newEndpoint := range newEndpoints { + allEndpoints[newEndpoint] = 1 } - for _, existingEndpoint := range lb.endpointsMap[service] { + state, exists := lb.services[service] + if !exists { + return + } + for _, existingEndpoint := range state.endpoints { allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1 } for mKey, mVal := range allEndpoints { if mVal == 1 { - glog.V(3).Infof("Delete endpoint %s for service: %s", mKey, service) - removeSessionAffinityByEndpoint(lb, service, mKey) - delete(lb.serviceDtlMap[service].sessionAffinityMap, mKey) + glog.V(3).Infof("Delete endpoint %s for service %q", mKey, service) + removeSessionAffinityByEndpoint(state, service, mKey) } } } @@ -198,44 +220,68 @@ func updateServiceDetailMap(lb *LoadBalancerRR, service string, validEndpoints [ // OnUpdate manages the registered service endpoints. // Registered endpoints are updated if found in the update set or // unregistered if missing from the update set. -func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { - registeredEndpoints := make(map[string]bool) +func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { + registeredEndpoints := make(map[balancerKey]bool) lb.lock.Lock() defer lb.lock.Unlock() + // Update endpoints for services. - for _, endpoint := range endpoints { - existingEndpoints, exists := lb.endpointsMap[endpoint.Name] - validEndpoints := filterValidEndpoints(endpoint.Endpoints) - if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(existingEndpoints)), slice.SortStrings(validEndpoints)) { - glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", endpoint.Name, endpoint.Endpoints) - updateServiceDetailMap(lb, endpoint.Name, validEndpoints) + for _, svcEndpoints := range allEndpoints { + key := balancerKey(svcEndpoints.Name) + state, exists := lb.services[key] + curEndpoints := []string{} + if state != nil { + curEndpoints = state.endpoints + } + newEndpoints := filterValidEndpoints(svcEndpoints.Endpoints) + if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { + glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints) + lb.updateAffinityMap(key, newEndpoints) // On update can be called without NewService being called externally. - // to be safe we will call it here. A new service will only be created + // To be safe we will call it here. A new service will only be created // if one does not already exist. - lb.NewService(endpoint.Name, api.AffinityTypeNone, 0) - lb.endpointsMap[endpoint.Name] = slice.ShuffleStrings(validEndpoints) + state = lb.newServiceInternal(svcEndpoints.Name, api.AffinityTypeNone, 0) + state.endpoints = slice.ShuffleStrings(newEndpoints) // Reset the round-robin index. - lb.rrIndex[endpoint.Name] = 0 + state.index = 0 } - registeredEndpoints[endpoint.Name] = true + registeredEndpoints[key] = true } // Remove endpoints missing from the update. - for k, v := range lb.endpointsMap { + for k := range lb.services { if _, exists := registeredEndpoints[k]; !exists { - glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s -> %+v", k, v) - delete(lb.endpointsMap, k) - delete(lb.serviceDtlMap, k) + glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s", k) + delete(lb.services, k) } } } +// Tests whether two slices are equivalent. This sorts both slices in-place. +func slicesEquiv(lhs, rhs []string) bool { + if len(lhs) != len(rhs) { + return false + } + if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) { + return true + } + return false +} + func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { - stickyMaxAgeMinutes := lb.serviceDtlMap[service].stickyMaxAgeMinutes - for key, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { - if int(time.Now().Sub(affinityDetail.lastUsedDTTM).Minutes()) >= stickyMaxAgeMinutes { - glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s. Last used is greater than %d minutes....", affinityDetail.clientIPAddress, service, stickyMaxAgeMinutes) - delete(lb.serviceDtlMap[service].sessionAffinityMap, key) + lb.lock.Lock() + defer lb.lock.Unlock() + + key := balancerKey(service) + state, exists := lb.services[key] + if !exists { + glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service) + return + } + for ip, affinity := range state.affinity.affinityMap { + if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= state.affinity.ttlMinutes { + glog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, service) + delete(state.affinity.affinityMap, ip) } } } diff --git a/pkg/proxy/roundrobin_test.go b/pkg/proxy/roundrobin_test.go index 3298aa92bd5..c47e07eab06 100644 --- a/pkg/proxy/roundrobin_test.go +++ b/pkg/proxy/roundrobin_test.go @@ -119,7 +119,7 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) @@ -142,7 +142,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) @@ -157,7 +157,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] + shuffledEndpoints = loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) @@ -195,14 +195,14 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] + shuffledFooEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) - shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] + shuffledBarEndpoints := loadBalancer.services["bar"].endpoints expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) @@ -265,7 +265,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) @@ -297,7 +297,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2) @@ -332,7 +332,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) client1Endpoint := shuffledEndpoints[0] expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) @@ -350,7 +350,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] + shuffledEndpoints = loadBalancer.services["foo"].endpoints if client1Endpoint == "endpoint:3" { client1Endpoint = shuffledEndpoints[0] } else if client2Endpoint == "endpoint:3" { @@ -371,7 +371,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] + shuffledEndpoints = loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) @@ -401,7 +401,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.endpointsMap["foo"] + shuffledEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) @@ -418,7 +418,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.endpointsMap["foo"] + shuffledEndpoints = loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) @@ -463,7 +463,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, } loadBalancer.OnUpdate(endpoints) - shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] + shuffledFooEndpoints := loadBalancer.services["foo"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3) @@ -471,7 +471,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] + shuffledBarEndpoints := loadBalancer.services["bar"].endpoints expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) @@ -487,7 +487,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { } // but bar is still there, and we continue RR from where we left off. - shuffledBarEndpoints = loadBalancer.endpointsMap["bar"] + shuffledBarEndpoints = loadBalancer.services["bar"].endpoints expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)