diff --git a/pkg/proxy/userspace/roundrobin.go b/pkg/proxy/userspace/roundrobin.go index 33c16f41292..29259a4bc3a 100644 --- a/pkg/proxy/userspace/roundrobin.go +++ b/pkg/proxy/userspace/roundrobin.go @@ -26,6 +26,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/util" @@ -201,22 +202,20 @@ func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.Service // Loop through the valid endpoints and then the endpoints associated with the Load Balancer. // Then remove any session affinity records that are not in both lists. // This assumes the lb.lock is held. -func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEndpoints []string) { - allEndpoints := map[string]int{} +func (lb *LoadBalancerRR) removeStaleAffinity(svcPort proxy.ServicePortName, newEndpoints []string) { + newEndpointsSet := sets.NewString() for _, newEndpoint := range newEndpoints { - allEndpoints[newEndpoint] = 1 + newEndpointsSet.Insert(newEndpoint) } + state, exists := lb.services[svcPort] if !exists { return } for _, existingEndpoint := range state.endpoints { - allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1 - } - for mKey, mVal := range allEndpoints { - if mVal == 1 { - klog.V(2).Infof("Delete endpoint %s for service %q", mKey, svcPort) - removeSessionAffinityByEndpoint(state, svcPort, mKey) + if !newEndpointsSet.Has(existingEndpoint) { + klog.V(2).Infof("Delete endpoint %s for service %q", existingEndpoint, svcPort) + removeSessionAffinityByEndpoint(state, svcPort, existingEndpoint) } } } @@ -234,11 +233,9 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) { if !exists || state == nil || len(newEndpoints) > 0 { klog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) - lb.updateAffinityMap(svcPort, newEndpoints) // OnEndpointsAdd can be called without NewService being called externally. // To be safe we will call it here. A new service will only be created - // if one does not already exist. The affinity will be updated - // later, once NewService is called. + // if one does not already exist. state = lb.newServiceInternal(svcPort, v1.ServiceAffinity(""), 0) state.endpoints = util.ShuffleStrings(newEndpoints) @@ -268,7 +265,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { klog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) - lb.updateAffinityMap(svcPort, newEndpoints) + lb.removeStaleAffinity(svcPort, newEndpoints) // OnEndpointsUpdate can be called without NewService being called externally. // To be safe we will call it here. A new service will only be created // if one does not already exist. The affinity will be updated