diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 57446aeae81..0a8123be094 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -250,6 +250,17 @@ type serviceChangeMap struct { items map[types.NamespacedName]*serviceChange } +type updateEndpointMapResult struct { + hcEndpoints map[types.NamespacedName]int + staleEndpoints map[endpointServicePair]bool + staleServiceNames map[proxy.ServicePortName]bool +} + +type updateServiceMapResult struct { + hcServices map[types.NamespacedName]uint16 + staleServices sets.String +} + type proxyServiceMap map[proxy.ServicePortName]*serviceInfo type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo @@ -694,29 +705,29 @@ func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool // map is cleared after applying them. func updateServiceMap( serviceMap proxyServiceMap, - changes *serviceChangeMap) (hcServices map[types.NamespacedName]uint16, staleServices sets.String) { - staleServices = sets.NewString() + changes *serviceChangeMap) (result updateServiceMapResult) { + result.staleServices = sets.NewString() func() { changes.lock.Lock() defer changes.lock.Unlock() for _, change := range changes.items { existingPorts := serviceMap.merge(change.current) - serviceMap.unmerge(change.previous, existingPorts, staleServices) + serviceMap.unmerge(change.previous, existingPorts, result.staleServices) } changes.items = make(map[types.NamespacedName]*serviceChange) }() // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to serviceMap. - hcServices = make(map[types.NamespacedName]uint16) + result.hcServices = make(map[types.NamespacedName]uint16) for svcPortName, info := range serviceMap { if info.healthCheckNodePort != 0 { - hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort) + result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort) } } - return hcServices, staleServices + return result } func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) { @@ -755,16 +766,17 @@ func (proxier *Proxier) OnEndpointsSynced() { func updateEndpointsMap( endpointsMap proxyEndpointsMap, changes *endpointsChangeMap, - hostname string) (hcEndpoints map[types.NamespacedName]int, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) { - staleEndpoints = make(map[endpointServicePair]bool) - staleServiceNames = make(map[proxy.ServicePortName]bool) + hostname string) (result updateEndpointMapResult) { + result.staleEndpoints = make(map[endpointServicePair]bool) + result.staleServiceNames = make(map[proxy.ServicePortName]bool) + func() { changes.lock.Lock() defer changes.lock.Unlock() for _, change := range changes.items { endpointsMap.unmerge(change.previous) endpointsMap.merge(change.current) - detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames) + detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames) } changes.items = make(map[types.NamespacedName]*endpointsChange) }() @@ -775,13 +787,13 @@ func updateEndpointsMap( // TODO: If this will appear to be computationally expensive, consider // computing this incrementally similarly to endpointsMap. - hcEndpoints = make(map[types.NamespacedName]int) + result.hcEndpoints = make(map[types.NamespacedName]int) localIPs := getLocalIPs(endpointsMap) for nsn, ips := range localIPs { - hcEndpoints[nsn] = len(ips) + result.hcEndpoints[nsn] = len(ips) } - return hcEndpoints, staleEndpoints, staleServiceNames + return result } // and are modified by this function with detected stale connections. @@ -986,17 +998,17 @@ func (proxier *Proxier) syncProxyRules() { return } - var staleServices sets.String // We assume that if this was called, we really want to sync them, // even if nothing changed in the meantime. In other words, callers are // responsible for detecting no-op changes and not calling this function. - hcServices, staleServices := updateServiceMap( + serviceUpdateResult := updateServiceMap( proxier.serviceMap, &proxier.serviceChanges) - hcEndpoints, staleEndpoints, staleServiceNames := updateEndpointsMap( + endpointUpdateResult := updateEndpointsMap( proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname) + staleServices := serviceUpdateResult.staleServices // merge stale services gathered from updateEndpointsMap - for svcPortName := range staleServiceNames { + for svcPortName := range endpointUpdateResult.staleServiceNames { if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP { glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String()) staleServices.Insert(svcInfo.clusterIP.String()) @@ -1609,17 +1621,17 @@ func (proxier *Proxier) syncProxyRules() { // Update healthchecks. The endpoints list might include services that are // not "OnlyLocal", but the services list will not, and the healthChecker // will just drop those endpoints. - if err := proxier.healthChecker.SyncServices(hcServices); err != nil { + if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil { glog.Errorf("Error syncing healtcheck services: %v", err) } - if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil { + if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil { glog.Errorf("Error syncing healthcheck endoints: %v", err) } // Finish housekeeping. // TODO: these and clearUDPConntrackForPort() could be made more consistent. utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List()) - proxier.deleteEndpointConnections(staleEndpoints) + proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints) } // Clear UDP conntrack for port or all conntrack entries when port equal zero.