From 76a7d690db0c4275a627d9e83d834579577151b4 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Tue, 7 Mar 2017 13:32:02 -0600 Subject: [PATCH 1/2] kube-proxy: simplify endpoints updates We don't need the svcPortToInfoMap. Its only purpose was to send "valid" local endpoints (those with valid IP and >0 port) to the health checker. But we shouldn't be sending invalid endpoints to the health checker anyway, because it can't do anything with them. If we exclude invalid endpoints earlier, then we don't need flattenValidEndpoints(). And if we don't need flattenValidEndpoints() it makes no sense to have svcPortToInfoMap store hostPortInfo, since endpointsInfo is the same thing as hostPortInfo except with a combined host:port. And if svcPortToInfoMap now only stores valid endpointsInfos, it is exactly the same thing as newEndpoints. --- pkg/proxy/iptables/proxier.go | 95 +++++++----------------------- pkg/proxy/iptables/proxier_test.go | 3 +- 2 files changed, 21 insertions(+), 77 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 1d2bfe8ceee..e9a3c4b61be 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -556,32 +556,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { utilproxy.DeleteServiceConnections(proxier.exec, staleUDPServices.List()) } -// Reconstruct the list of endpoint infos from the endpointIP list -// Use the slice of endpointIPs to rebuild a slice of corresponding {endpointIP, localEndpointOnly} infos -// from the full []hostPortInfo slice. -// -// For e.g. if input is -// endpoints = []hostPortInfo{ {host="1.1.1.1", port=22, localEndpointOnly=}, {host="2.2.2.2", port=80, localEndpointOnly=} } -// endpointIPs = []string{ "2.2.2.2:80" } -// -// then output will be -// -// []endpointsInfo{ {"2.2.2.2:80", localEndpointOnly=} } -func buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*endpointsInfo { - lookupSet := sets.NewString() - for _, ip := range endpointIPs { - lookupSet.Insert(ip) - } - var filteredEndpoints []*endpointsInfo - for _, hpp := range endPoints { - key := net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)) - if lookupSet.Has(key) { - filteredEndpoints = append(filteredEndpoints, &endpointsInfo{endpoint: key, isLocal: hpp.isLocal}) - } - } - return filteredEndpoints -} - // OnEndpointsUpdate takes in a slice of updated endpoints. func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { proxier.mu.Lock() @@ -611,12 +585,9 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN newMap = make(map[proxy.ServicePortName][]*endpointsInfo) staleSet = make(map[endpointServicePair]bool) - // local - svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo) - // Update endpoints for services. for i := range allEndpoints { - accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap) + accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap) } // Check stale connections against endpoints missing from the update. // TODO: we should really only mark a connection stale if the proto was UDP @@ -646,7 +617,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN allSvcPorts[svcPort] = true } for svcPort := range allSvcPorts { - updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort], healthChecker) + updateHealthCheckEntries(svcPort.NamespacedName, newMap[svcPort], healthChecker) } return newMap, staleSet @@ -663,8 +634,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN // - naming is poor and responsibilities are muddled func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, curEndpoints map[proxy.ServicePortName][]*endpointsInfo, - newEndpoints *map[proxy.ServicePortName][]*endpointsInfo, - svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo) { + newEndpoints *map[proxy.ServicePortName][]*endpointsInfo) { // We need to build a map of portname -> all ip:ports for that // portname. Explode Endpoints.Subsets[*] into this structure. @@ -672,70 +642,45 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, ss := &endpoints.Subsets[i] for i := range ss.Ports { port := &ss.Ports[i] + if port.Port == 0 { + glog.Warningf("ignoring invalid endpoint port %s", port.Name) + continue + } svcPort := proxy.ServicePortName{ NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: port.Name, } for i := range ss.Addresses { addr := &ss.Addresses[i] - hostPortObject := hostPortInfo{ - host: addr.IP, - port: int(port.Port), - isLocal: addr.NodeName != nil && *addr.NodeName == hostname, + if addr.IP == "" { + glog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name) + continue } - (*svcPortToInfoMap)[svcPort] = append((*svcPortToInfoMap)[svcPort], hostPortObject) + epInfo := &endpointsInfo{ + endpoint: net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))), + isLocal: addr.NodeName != nil && *addr.NodeName == hostname, + } + (*newEndpoints)[svcPort] = append((*newEndpoints)[svcPort], epInfo) } } } - // Decompose the lists of endpoints into details of what was changed for the caller. - for svcPort, hostPortInfos := range *svcPortToInfoMap { - newEPList := flattenValidEndpoints(hostPortInfos) - glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEPList) - // Once the set operations using the list of ips are complete, build the list of endpoint infos - (*newEndpoints)[svcPort] = buildEndpointInfoList(hostPortInfos, newEPList) - } } // updateHealthCheckEntries - send the new set of local endpoints to the health checker -func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo, healthChecker healthChecker) { +func updateHealthCheckEntries(name types.NamespacedName, endpoints []*endpointsInfo, healthChecker healthChecker) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { return } // Use a set instead of a slice to provide deduplication - endpoints := sets.NewString() - for _, portInfo := range hostPorts { + epSet := sets.NewString() + for _, portInfo := range endpoints { if portInfo.isLocal { // kube-proxy health check only needs local endpoints - endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name)) + epSet.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name)) } } - healthChecker.UpdateEndpoints(name, endpoints) -} - -// used in OnEndpointsUpdate -type hostPortInfo struct { - host string - port int - isLocal bool -} - -func isValidEndpoint(hpp *hostPortInfo) bool { - return hpp.host != "" && hpp.port > 0 -} - -func flattenValidEndpoints(endpoints []hostPortInfo) []string { - // Convert Endpoint objects into strings for easier use later. - var result []string - for i := range endpoints { - hpp := &endpoints[i] - if isValidEndpoint(hpp) { - result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))) - } else { - glog.Warningf("got invalid endpoint: %+v", *hpp) - } - } - return result + healthChecker.UpdateEndpoints(name, epSet) } // portProtoHash takes the ServicePortName and protocol for a service diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 6fb4532174c..d677aee46dc 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -1337,8 +1337,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { for tci, tc := range testCases { // outputs newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{} - svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{} - accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints, &svcPortToInfoMap) + accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints) if len(newEndpoints) != len(tc.expectedNew) { t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints)) From f7630f888f59f8c19030e6c87a0e334cea9f4786 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Tue, 7 Mar 2017 13:51:58 -0600 Subject: [PATCH 2/2] kube-proxy/iptables: use a type for endpoints info map --- pkg/proxy/iptables/proxier.go | 16 +++++++++------- pkg/proxy/iptables/proxier_test.go | 2 +- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index e9a3c4b61be..e77f77e8612 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -188,12 +188,14 @@ func newServiceInfo(serviceName proxy.ServicePortName, port *api.ServicePort, se type proxyServiceMap map[proxy.ServicePortName]*serviceInfo +type proxyEndpointMap map[proxy.ServicePortName][]*endpointsInfo + // Proxier is an iptables based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { mu sync.Mutex // protects the following fields serviceMap proxyServiceMap - endpointsMap map[proxy.ServicePortName][]*endpointsInfo + endpointsMap proxyEndpointMap portsMap map[localPort]closeable haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event allEndpoints []api.Endpoints // nil until we have seen an OnEndpointsUpdate event @@ -320,7 +322,7 @@ func NewProxier(ipt utiliptables.Interface, return &Proxier{ serviceMap: make(proxyServiceMap), - endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo), + endpointsMap: make(proxyEndpointMap), portsMap: make(map[localPort]closeable), syncPeriod: syncPeriod, minSyncPeriod: minSyncPeriod, @@ -578,11 +580,11 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { } // Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. -func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortName][]*endpointsInfo, hostname string, - healthChecker healthChecker) (newMap map[proxy.ServicePortName][]*endpointsInfo, staleSet map[endpointServicePair]bool) { +func updateEndpoints(allEndpoints []api.Endpoints, curMap proxyEndpointMap, hostname string, + healthChecker healthChecker) (newMap proxyEndpointMap, staleSet map[endpointServicePair]bool) { // return values - newMap = make(map[proxy.ServicePortName][]*endpointsInfo) + newMap = make(proxyEndpointMap) staleSet = make(map[endpointServicePair]bool) // Update endpoints for services. @@ -633,8 +635,8 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN // - the test for this is overlapped by the test for updateEndpoints // - naming is poor and responsibilities are muddled func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, - curEndpoints map[proxy.ServicePortName][]*endpointsInfo, - newEndpoints *map[proxy.ServicePortName][]*endpointsInfo) { + curEndpoints proxyEndpointMap, + newEndpoints *proxyEndpointMap) { // We need to build a map of portname -> all ip:ports for that // portname. Explode Endpoints.Subsets[*] into this structure. diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index d677aee46dc..fb409f1270e 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -1336,7 +1336,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { for tci, tc := range testCases { // outputs - newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{} + newEndpoints := make(proxyEndpointMap) accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints) if len(newEndpoints) != len(tc.expectedNew) {