refactor updateEndpointMap and updateServiceMap results

This commit is contained in:
Minhan Xia 2017-07-06 15:53:03 -07:00
parent 25ac521f88
commit 46d3e83caf

View File

@ -250,6 +250,17 @@ type serviceChangeMap struct {
items map[types.NamespacedName]*serviceChange items map[types.NamespacedName]*serviceChange
} }
type updateEndpointMapResult struct {
hcEndpoints map[types.NamespacedName]int
staleEndpoints map[endpointServicePair]bool
staleServiceNames map[proxy.ServicePortName]bool
}
type updateServiceMapResult struct {
hcServices map[types.NamespacedName]uint16
staleServices sets.String
}
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
@ -694,29 +705,29 @@ func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool
// <changes> map is cleared after applying them. // <changes> map is cleared after applying them.
func updateServiceMap( func updateServiceMap(
serviceMap proxyServiceMap, serviceMap proxyServiceMap,
changes *serviceChangeMap) (hcServices map[types.NamespacedName]uint16, staleServices sets.String) { changes *serviceChangeMap) (result updateServiceMapResult) {
staleServices = sets.NewString() result.staleServices = sets.NewString()
func() { func() {
changes.lock.Lock() changes.lock.Lock()
defer changes.lock.Unlock() defer changes.lock.Unlock()
for _, change := range changes.items { for _, change := range changes.items {
existingPorts := serviceMap.merge(change.current) existingPorts := serviceMap.merge(change.current)
serviceMap.unmerge(change.previous, existingPorts, staleServices) serviceMap.unmerge(change.previous, existingPorts, result.staleServices)
} }
changes.items = make(map[types.NamespacedName]*serviceChange) changes.items = make(map[types.NamespacedName]*serviceChange)
}() }()
// TODO: If this will appear to be computationally expensive, consider // TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to serviceMap. // computing this incrementally similarly to serviceMap.
hcServices = make(map[types.NamespacedName]uint16) result.hcServices = make(map[types.NamespacedName]uint16)
for svcPortName, info := range serviceMap { for svcPortName, info := range serviceMap {
if info.healthCheckNodePort != 0 { if info.healthCheckNodePort != 0 {
hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort) result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
} }
} }
return hcServices, staleServices return result
} }
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
@ -755,16 +766,17 @@ func (proxier *Proxier) OnEndpointsSynced() {
func updateEndpointsMap( func updateEndpointsMap(
endpointsMap proxyEndpointsMap, endpointsMap proxyEndpointsMap,
changes *endpointsChangeMap, changes *endpointsChangeMap,
hostname string) (hcEndpoints map[types.NamespacedName]int, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) { hostname string) (result updateEndpointMapResult) {
staleEndpoints = make(map[endpointServicePair]bool) result.staleEndpoints = make(map[endpointServicePair]bool)
staleServiceNames = make(map[proxy.ServicePortName]bool) result.staleServiceNames = make(map[proxy.ServicePortName]bool)
func() { func() {
changes.lock.Lock() changes.lock.Lock()
defer changes.lock.Unlock() defer changes.lock.Unlock()
for _, change := range changes.items { for _, change := range changes.items {
endpointsMap.unmerge(change.previous) endpointsMap.unmerge(change.previous)
endpointsMap.merge(change.current) endpointsMap.merge(change.current)
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames) detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames)
} }
changes.items = make(map[types.NamespacedName]*endpointsChange) changes.items = make(map[types.NamespacedName]*endpointsChange)
}() }()
@ -775,13 +787,13 @@ func updateEndpointsMap(
// TODO: If this will appear to be computationally expensive, consider // TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to endpointsMap. // computing this incrementally similarly to endpointsMap.
hcEndpoints = make(map[types.NamespacedName]int) result.hcEndpoints = make(map[types.NamespacedName]int)
localIPs := getLocalIPs(endpointsMap) localIPs := getLocalIPs(endpointsMap)
for nsn, ips := range localIPs { for nsn, ips := range localIPs {
hcEndpoints[nsn] = len(ips) result.hcEndpoints[nsn] = len(ips)
} }
return hcEndpoints, staleEndpoints, staleServiceNames return result
} }
// <staleEndpoints> and <staleServices> are modified by this function with detected stale connections. // <staleEndpoints> and <staleServices> are modified by this function with detected stale connections.
@ -986,17 +998,17 @@ func (proxier *Proxier) syncProxyRules() {
return return
} }
var staleServices sets.String
// We assume that if this was called, we really want to sync them, // We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are // even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function. // responsible for detecting no-op changes and not calling this function.
hcServices, staleServices := updateServiceMap( serviceUpdateResult := updateServiceMap(
proxier.serviceMap, &proxier.serviceChanges) proxier.serviceMap, &proxier.serviceChanges)
hcEndpoints, staleEndpoints, staleServiceNames := updateEndpointsMap( endpointUpdateResult := updateEndpointsMap(
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
staleServices := serviceUpdateResult.staleServices
// merge stale services gathered from updateEndpointsMap // merge stale services gathered from updateEndpointsMap
for svcPortName := range staleServiceNames { for svcPortName := range endpointUpdateResult.staleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP { if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP {
glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String()) glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String())
staleServices.Insert(svcInfo.clusterIP.String()) staleServices.Insert(svcInfo.clusterIP.String())
@ -1609,17 +1621,17 @@ func (proxier *Proxier) syncProxyRules() {
// Update healthchecks. The endpoints list might include services that are // Update healthchecks. The endpoints list might include services that are
// not "OnlyLocal", but the services list will not, and the healthChecker // not "OnlyLocal", but the services list will not, and the healthChecker
// will just drop those endpoints. // will just drop those endpoints.
if err := proxier.healthChecker.SyncServices(hcServices); err != nil { if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil {
glog.Errorf("Error syncing healtcheck services: %v", err) glog.Errorf("Error syncing healtcheck services: %v", err)
} }
if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil { if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
glog.Errorf("Error syncing healthcheck endoints: %v", err) glog.Errorf("Error syncing healthcheck endoints: %v", err)
} }
// Finish housekeeping. // Finish housekeeping.
// TODO: these and clearUDPConntrackForPort() could be made more consistent. // TODO: these and clearUDPConntrackForPort() could be made more consistent.
utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List()) utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List())
proxier.deleteEndpointConnections(staleEndpoints) proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints)
} }
// Clear UDP conntrack for port or all conntrack entries when port equal zero. // Clear UDP conntrack for port or all conntrack entries when port equal zero.