Merge remote-tracking branch 'thockin/userspace-proxy-affinity-25314' into kubeproxyfix

This commit is contained in:
Minhan Xia 2016-07-01 11:48:02 -07:00
commit 51dcff40e6
3 changed files with 15 additions and 1 deletions

View File

@ -29,5 +29,6 @@ type LoadBalancer interface {
// service-port and source address. // service-port and source address.
NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error) NextEndpoint(service proxy.ServicePortName, srcAddr net.Addr, sessionAffinityReset bool) (string, error)
NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error NewService(service proxy.ServicePortName, sessionAffinityType api.ServiceAffinity, stickyMaxAgeMinutes int) error
DeleteService(service proxy.ServicePortName)
CleanupStaleStickySessions(service proxy.ServicePortName) CleanupStaleStickySessions(service proxy.ServicePortName)
} }

View File

@ -447,6 +447,7 @@ func (proxier *Proxier) OnServiceUpdate(services []api.Service) {
if err != nil { if err != nil {
glog.Errorf("Failed to stop service %q: %v", name, err) glog.Errorf("Failed to stop service %q: %v", name, err)
} }
proxier.loadBalancer.DeleteService(name)
} }
} }
} }

View File

@ -82,6 +82,7 @@ func NewLoadBalancerRR() *LoadBalancerRR {
} }
func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) error { func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType api.ServiceAffinity, ttlMinutes int) error {
glog.V(4).Infof("LoadBalancerRR NewService %q", svcPort)
lb.lock.Lock() lb.lock.Lock()
defer lb.lock.Unlock() defer lb.lock.Unlock()
lb.newServiceInternal(svcPort, affinityType, ttlMinutes) lb.newServiceInternal(svcPort, affinityType, ttlMinutes)
@ -103,6 +104,13 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affi
return lb.services[svcPort] return lb.services[svcPort]
} }
func (lb *LoadBalancerRR) DeleteService(svcPort proxy.ServicePortName) {
glog.V(4).Infof("LoadBalancerRR DeleteService %q", svcPort)
lb.lock.Lock()
defer lb.lock.Unlock()
delete(lb.services, svcPort)
}
// return true if this service is using some form of session affinity. // return true if this service is using some form of session affinity.
func isSessionAffinity(affinity *affinityPolicy) bool { func isSessionAffinity(affinity *affinityPolicy) bool {
// Should never be empty string, but checking for it to be safe. // Should never be empty string, but checking for it to be safe.
@ -281,7 +289,11 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
for k := range lb.services { for k := range lb.services {
if _, exists := registeredEndpoints[k]; !exists { if _, exists := registeredEndpoints[k]; !exists {
glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", k) glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", k)
delete(lb.services, k) // Reset but don't delete.
state := lb.services[k]
state.endpoints = []string{}
state.index = 0
state.affinity.affinityMap = map[string]*affinityState{}
} }
} }
} }