diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 1d2bfe8ceee..e9a3c4b61be 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -556,32 +556,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List()) } -// Reconstruct the list of endpoint infos from the endpointIP list -// Use the slice of endpointIPs to rebuild a slice of corresponding {endpointIP, localEndpointOnly} infos -// from the full []hostPortInfo slice. -// -// For e.g. if input is -// endpoints = []hostPortInfo{ {host="1.1.1.1", port=22, localEndpointOnly=}, {host="2.2.2.2", port=80, localEndpointOnly=} } -// endpointIPs = []string{ "2.2.2.2:80" } -// -// then output will be -// -// []endpointsInfo{ {"2.2.2.2:80", localEndpointOnly=} } -func buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*endpointsInfo { - lookupSet := sets.NewString() - for _, ip := range endpointIPs { - lookupSet.Insert(ip) - } - var filteredEndpoints []*endpointsInfo - for _, hpp := range endPoints { - key := net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)) - if lookupSet.Has(key) { - filteredEndpoints = append(filteredEndpoints, &endpointsInfo{endpoint: key, isLocal: hpp.isLocal}) - } - } - return filteredEndpoints -} - // OnEndpointsUpdate takes in a slice of updated endpoints. func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { proxier.mu.Lock() @@ -611,12 +585,9 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN newMap = make(map[proxy.ServicePortName][]*endpointsInfo) staleSet = make(map[endpointServicePair]bool) - // local - svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo) - // Update endpoints for services. for i := range allEndpoints { - accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap) + accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap) } // Check stale connections against endpoints missing from the update. // TODO: we should really only mark a connection stale if the proto was UDP @@ -646,7 +617,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN allSvcPorts[svcPort] = true } for svcPort := range allSvcPorts { - updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort], healthChecker) + updateHealthCheckEntries(svcPort.NamespacedName, newMap[svcPort], healthChecker) } return newMap, staleSet @@ -663,8 +634,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN // - naming is poor and responsibilities are muddled func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, curEndpoints map[proxy.ServicePortName][]*endpointsInfo, - newEndpoints *map[proxy.ServicePortName][]*endpointsInfo, - svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo) { + newEndpoints *map[proxy.ServicePortName][]*endpointsInfo) { // We need to build a map of portname -> all ip:ports for that // portname. Explode Endpoints.Subsets[*] into this structure. @@ -672,70 +642,45 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, ss := &endpoints.Subsets[i] for i := range ss.Ports { port := &ss.Ports[i] + if port.Port == 0 { + glog.Warningf("ignoring invalid endpoint port %s", port.Name) + continue + } svcPort := proxy.ServicePortName{ NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: port.Name, } for i := range ss.Addresses { addr := &ss.Addresses[i] - hostPortObject := hostPortInfo{ - host: addr.IP, - port: int(port.Port), - isLocal: addr.NodeName != nil && *addr.NodeName == hostname, + if addr.IP == "" { + glog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name) + continue } - (*svcPortToInfoMap)[svcPort] = append((*svcPortToInfoMap)[svcPort], hostPortObject) + epInfo := &endpointsInfo{ + endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))), + isLocal: addr.NodeName != nil && *addr.NodeName == hostname, + } + (*newEndpoints)[svcPort] = append((*newEndpoints)[svcPort], epInfo) } } } - // Decompose the lists of endpoints into details of what was changed for the caller. - for svcPort, hostPortInfos := range *svcPortToInfoMap { - newEPList := flattenValidEndpoints(hostPortInfos) - glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEPList) - // Once the set operations using the list of ips are complete, build the list of endpoint infos - (*newEndpoints)[svcPort] = buildEndpointInfoList(hostPortInfos, newEPList) - } } // updateHealthCheckEntries - send the new set of local endpoints to the health checker -func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo, healthChecker healthChecker) { +func updateHealthCheckEntries(name types.NamespacedName, endpoints []*endpointsInfo, healthChecker healthChecker) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { return } // Use a set instead of a slice to provide deduplication - endpoints := sets.NewString() - for _, portInfo := range hostPorts { + epSet := sets.NewString() + for _, portInfo := range endpoints { if portInfo.isLocal { // kube-proxy health check only needs local endpoints - endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name)) + epSet.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name)) } } - healthChecker.UpdateEndpoints(name, endpoints) -} - -// used in OnEndpointsUpdate -type hostPortInfo struct { - host string - port int - isLocal bool -} - -func isValidEndpoint(hpp *hostPortInfo) bool { - return hpp.host != "" && hpp.port > 0 -} - -func flattenValidEndpoints(endpoints []hostPortInfo) []string { - // Convert Endpoint objects into strings for easier use later. - var result []string - for i := range endpoints { - hpp := &endpoints[i] - if isValidEndpoint(hpp) { - result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))) - } else { - glog.Warningf("got invalid endpoint: %+v", *hpp) - } - } - return result + healthChecker.UpdateEndpoints(name, epSet) } // portProtoHash takes the ServicePortName and protocol for a service diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 6fb4532174c..d677aee46dc 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -1337,8 +1337,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { for tci, tc := range testCases { // outputs newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{} - svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{} - accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints, &svcPortToInfoMap) + accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints) if len(newEndpoints) != len(tc.expectedNew) { t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints))