From 1c180e0865a63bec5848075bdd59c04fa7387258 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Wed, 1 Feb 2017 11:36:24 -0800 Subject: [PATCH 1/9] Simplify "is local" detection Move the feature test to where we are activating the feature, rather than where we detect locality. This is in service of better tests, which is in service of less-frequent resyncing, which is going to require refactoring. --- pkg/proxy/iptables/proxier.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 311ef58d04b..612f8cfb638 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -606,15 +606,10 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { port := &ss.Ports[i] for i := range ss.Addresses { addr := &ss.Addresses[i] - var isLocalEndpoint bool - if addr.NodeName != nil { - isLocalEndpoint = *addr.NodeName == proxier.hostname - isLocalEndpoint = utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) && isLocalEndpoint - } hostPortObject := hostPortInfo{ host: addr.IP, port: int(port.Port), - localEndpoint: isLocalEndpoint, + localEndpoint: addr.NodeName != nil && *addr.NodeName == proxier.hostname, } portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject) } @@ -675,6 +670,10 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { // updateHealthCheckEntries - send the new set of local endpoints to the health checker func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) { + if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { + return + } + // Use a set instead of a slice to provide deduplication endpoints := sets.NewString() for _, portInfo := range hostPorts { From d578105a44e55319706871ae1157ec1e36ede4da Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Wed, 1 Feb 2017 12:17:41 -0800 Subject: [PATCH 2/9] Simple cleanup before refactoring --- pkg/proxy/iptables/proxier.go | 36 +++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 612f8cfb638..8d2127af322 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -150,8 +150,8 @@ type serviceInfo struct { // internal struct for endpoints information type endpointsInfo struct { - ip string - localEndpoint bool + endpoint string // TODO: should be an endpointString type + isLocal bool } // returns a new serviceInfo struct @@ -546,7 +546,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { func flattenEndpointsInfo(endPoints []*endpointsInfo) []string { var endpointIPs []string for _, ep := range endPoints { - endpointIPs = append(endpointIPs, ep.ip) + endpointIPs = append(endpointIPs, ep.endpoint) } return endpointIPs } @@ -562,7 +562,7 @@ func flattenEndpointsInfo(endPoints []*endpointsInfo) []string { // then output will be // // []endpointsInfo{ {"2.2.2.2:80", localEndpointOnly=} } -func (proxier *Proxier) buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*endpointsInfo { +func buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*endpointsInfo { lookupSet := sets.NewString() for _, ip := range endpointIPs { lookupSet.Insert(ip) @@ -571,7 +571,7 @@ func (proxier *Proxier) buildEndpointInfoList(endPoints []hostPortInfo, endpoint for _, hpp := range endPoints { key := net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)) if lookupSet.Has(key) { - filteredEndpoints = append(filteredEndpoints, &endpointsInfo{ip: key, localEndpoint: hpp.localEndpoint}) + filteredEndpoints = append(filteredEndpoints, &endpointsInfo{endpoint: key, isLocal: hpp.isLocal}) } } return filteredEndpoints @@ -607,9 +607,9 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { for i := range ss.Addresses { addr := &ss.Addresses[i] hostPortObject := hostPortInfo{ - host: addr.IP, - port: int(port.Port), - localEndpoint: addr.NodeName != nil && *addr.NodeName == proxier.hostname, + host: addr.IP, + port: int(port.Port), + isLocal: addr.NodeName != nil && *addr.NodeName == proxier.hostname, } portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject) } @@ -631,7 +631,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { } } // Once the set operations using the list of ips are complete, build the list of endpoint infos - newEndpointsMap[svcPort] = proxier.buildEndpointInfoList(portsToEndpoints[portname], newEndpoints) + newEndpointsMap[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEndpoints) activeEndpoints[svcPort] = true } } @@ -641,7 +641,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { glog.V(2).Infof("Removing endpoints for %q", svcPort) // record endpoints of unactive service to stale connections for _, ep := range proxier.endpointsMap[svcPort] { - staleConnections[endpointServicePair{endpoint: ep.ip, servicePortName: svcPort}] = true + staleConnections[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true } } } @@ -677,7 +677,7 @@ func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, host // Use a set instead of a slice to provide deduplication endpoints := sets.NewString() for _, portInfo := range hostPorts { - if portInfo.localEndpoint { + if portInfo.isLocal { // kube-proxy health check only needs local endpoints endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name)) } @@ -687,9 +687,9 @@ func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, host // used in OnEndpointsUpdate type hostPortInfo struct { - host string - port int - localEndpoint bool + host string + port int + isLocal bool } func isValidEndpoint(hpp *hostPortInfo) bool { @@ -1194,7 +1194,7 @@ func (proxier *Proxier) syncProxyRules() { endpointChains := make([]utiliptables.Chain, 0) for _, ep := range proxier.endpointsMap[svcName] { endpoints = append(endpoints, ep) - endpointChain := servicePortEndpointChainName(svcName, protocol, ep.ip) + endpointChain := servicePortEndpointChainName(svcName, protocol, ep.endpoint) endpointChains = append(endpointChains, endpointChain) // Create the endpoint chain, retaining counters if possible. @@ -1244,14 +1244,14 @@ func (proxier *Proxier) syncProxyRules() { } // Handle traffic that loops back to the originator with SNAT. writeLine(natRules, append(args, - "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].ip, ":")[0]), + "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].endpoint, ":")[0]), "-j", string(KubeMarkMasqChain))...) // Update client-affinity lists. if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") } // DNAT to final destination. - args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].ip) + args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].endpoint) writeLine(natRules, args...) } @@ -1265,7 +1265,7 @@ func (proxier *Proxier) syncProxyRules() { localEndpoints := make([]*endpointsInfo, 0) localEndpointChains := make([]utiliptables.Chain, 0) for i := range endpointChains { - if endpoints[i].localEndpoint { + if endpoints[i].isLocal { // These slices parallel each other; must be kept in sync localEndpoints = append(localEndpoints, endpoints[i]) localEndpointChains = append(localEndpointChains, endpointChains[i]) From 9507af3c79a8ecb90e6912e5f7d80de7f87385af Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Wed, 1 Feb 2017 12:47:29 -0800 Subject: [PATCH 3/9] Refactor OnEndpointsUpdate for testing This is a weird function, but I didn't want to change any semantics until the tests are in place. Testing exposed one bug where stale connections of renamed ports were not marked stale. There are other things that seem wrong here, more will follow. --- pkg/proxy/iptables/BUILD | 1 + pkg/proxy/iptables/proxier.go | 94 +++++---- pkg/proxy/iptables/proxier_test.go | 309 +++++++++++++++++++++++++++++ 3 files changed, 365 insertions(+), 39 deletions(-) diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 0c5cc6891d1..0df7cc5a7d4 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -46,6 +46,7 @@ go_test( "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", + "//vendor:github.com/davecgh/go-spew/spew", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/intstr", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 8d2127af322..5b27d6644f6 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -595,47 +595,12 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { // Update endpoints for services. for i := range allEndpoints { - svcEndpoints := &allEndpoints[i] - - // We need to build a map of portname -> all ip:ports for that - // portname. Explode Endpoints.Subsets[*] into this structure. - portsToEndpoints := map[string][]hostPortInfo{} - for i := range svcEndpoints.Subsets { - ss := &svcEndpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - for i := range ss.Addresses { - addr := &ss.Addresses[i] - hostPortObject := hostPortInfo{ - host: addr.IP, - port: int(port.Port), - isLocal: addr.NodeName != nil && *addr.NodeName == proxier.hostname, - } - portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject) - } - } - } - for portname := range portsToEndpoints { - svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname} - svcPortToInfoMap[svcPort] = portsToEndpoints[portname] - curEndpoints := proxier.endpointsMap[svcPort] - newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) - // Flatten the list of current endpoint infos to just a list of ips as strings - curEndpointIPs := flattenEndpointsInfo(curEndpoints) - if len(curEndpointIPs) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEndpoints) { - glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints) - // Gather stale connections to removed endpoints - removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEndpoints) - for _, ep := range removedEndpoints { - staleConnections[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true - } - } - // Once the set operations using the list of ips are complete, build the list of endpoint infos - newEndpointsMap[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEndpoints) - activeEndpoints[svcPort] = true - } + accumulateEndpointsMap(&allEndpoints[i], proxier.hostname, proxier.endpointsMap, &newEndpointsMap, + &svcPortToInfoMap, &staleConnections, &activeEndpoints) } // Check stale connections against endpoints missing from the update. + // TODO: we should really only mark a connection stale if the proto was UDP + // and the (ip, port, proto) was removed from the endpoints. for svcPort := range proxier.endpointsMap { if !activeEndpoints[svcPort] { glog.V(2).Infof("Removing endpoints for %q", svcPort) @@ -668,6 +633,57 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { proxier.deleteEndpointConnections(staleConnections) } +// Gather information about all the endpoint state for a given api.Endpoints. +// This can not detect all stale connections, so the caller should also check +// for entries that were totally removed. +func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, + curEndpoints map[proxy.ServicePortName][]*endpointsInfo, + newEndpoints *map[proxy.ServicePortName][]*endpointsInfo, + svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo, + staleConnections *map[endpointServicePair]bool, + activeEndpoints *map[proxy.ServicePortName]bool) { + + // We need to build a map of portname -> all ip:ports for that + // portname. Explode Endpoints.Subsets[*] into this structure. + portsToEndpoints := map[string][]hostPortInfo{} + for i := range endpoints.Subsets { + ss := &endpoints.Subsets[i] + for i := range ss.Ports { + port := &ss.Ports[i] + for i := range ss.Addresses { + addr := &ss.Addresses[i] + hostPortObject := hostPortInfo{ + host: addr.IP, + port: int(port.Port), + isLocal: addr.NodeName != nil && *addr.NodeName == hostname, + } + portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject) + } + } + } + for portname := range portsToEndpoints { + svcPort := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, + Port: portname, + } + (*svcPortToInfoMap)[svcPort] = portsToEndpoints[portname] + newEPList := flattenValidEndpoints(portsToEndpoints[portname]) + // Flatten the list of current endpoint infos to just a list of ips as strings + curEndpointIPs := flattenEndpointsInfo(curEndpoints[svcPort]) + if len(curEndpointIPs) != len(newEPList) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEPList) { + glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints) + // Gather stale connections to removed endpoints + removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEPList) + for _, ep := range removedEndpoints { + (*staleConnections)[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true + } + } + // Once the set operations using the list of ips are complete, build the list of endpoint infos + (*newEndpoints)[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEPList) + (*activeEndpoints)[svcPort] = true + } +} + // updateHealthCheckEntries - send the new set of local endpoints to the health checker func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 472432b8313..e763b95d36f 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -19,6 +19,8 @@ package iptables import ( "testing" + "github.com/davecgh/go-spew/spew" + "fmt" "net" "strings" @@ -1172,4 +1174,311 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { } } +// This is a coarse test, but it offers some modicum of confidence as the code is evolved. +func Test_accumulateEndpointsMap(t *testing.T) { + testCases := []struct { + newEndpoints api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedNew map[proxy.ServicePortName][]*endpointsInfo + expectedActive []proxy.ServicePortName + expectedStale []endpointServicePair + }{{ + // Case[0]: nothing + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedActive: []proxy.ServicePortName{}, + expectedStale: []endpointServicePair{}, + }, { + // Case[1]: no changes, unnamed port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "")}, + expectedStale: []endpointServicePair{}, + }, { + // Case[2]: no changes, named port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "port", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "port"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "port"): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "port")}, + expectedStale: []endpointServicePair{}, + }, { + // Case[3]: new port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): {}, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "")}, + expectedStale: []endpointServicePair{}, + }, { + // Case[4]: remove port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedActive: []proxy.ServicePortName{}, + expectedStale: []endpointServicePair{ /* can't detect this one */ }, + }, { + // Case[5]: new IP and port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }, { + IP: "2.2.2.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p1", + Port: 11, + }, { + Name: "p2", + Port: 22, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + {"2.2.2.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p2"): { + {"1.1.1.1:22", false}, + {"2.2.2.2:22", false}, + }, + }, + expectedActive: []proxy.ServicePortName{ + makeServicePortName("ns1", "ep1", "p1"), + makeServicePortName("ns1", "ep1", "p2"), + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[6]: remove IP and port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p1", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + {"2.2.2.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p2"): { + {"1.1.1.1:22", false}, + {"2.2.2.2:22", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{ + makeServicePortName("ns1", "ep1", "p1"), + }, + expectedStale: []endpointServicePair{{ + endpoint: "2.2.2.2:11", + servicePortName: makeServicePortName("ns1", "ep1", "p1"), + }}, + }, { + // Case[7]: rename port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p2", + Port: 11, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p2"): { + {"1.1.1.1:11", false}, + }, + }, + expectedActive: []proxy.ServicePortName{ + makeServicePortName("ns1", "ep1", "p2"), + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[8]: renumber port + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p1", + Port: 22, + }}, + }, + } + }), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:11", false}, + }, + }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p1"): { + {"1.1.1.1:22", false}, + }, + }, + expectedActive: []proxy.ServicePortName{ + makeServicePortName("ns1", "ep1", "p1"), + }, + expectedStale: []endpointServicePair{{ + endpoint: "1.1.1.1:11", + servicePortName: makeServicePortName("ns1", "ep1", "p1"), + }}, + }} + + for tci, tc := range testCases { + // outputs + newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{} + svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{} + staleConnections := map[endpointServicePair]bool{} + activeEndpoints := map[proxy.ServicePortName]bool{} + accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, + &newEndpoints, &svcPortToInfoMap, &staleConnections, &activeEndpoints) + + if len(newEndpoints) != len(tc.expectedNew) { + t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints)) + } + for x := range tc.expectedNew { + if len(newEndpoints[x]) != len(tc.expectedNew[x]) { + t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expectedNew[x]), x, len(newEndpoints[x])) + } else { + for i := range newEndpoints[x] { + if *(newEndpoints[x][i]) != *(tc.expectedNew[x][i]) { + t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expectedNew[x][i], *(newEndpoints[x][i])) + } + } + } + } + if len(activeEndpoints) != len(tc.expectedActive) { + t.Errorf("[%d] expected %d active, got %d: %v", tci, len(tc.expectedActive), len(activeEndpoints), activeEndpoints) + } + for _, x := range tc.expectedActive { + if activeEndpoints[x] != true { + t.Errorf("[%d] expected active[%v], but didn't find it: %v", tci, x, activeEndpoints) + } + } + if len(staleConnections) != len(tc.expectedStale) { + t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(staleConnections), staleConnections) + } + for _, x := range tc.expectedStale { + if staleConnections[x] != true { + t.Errorf("[%d] expected stale[%v], but didn't find it: %v", tci, x, staleConnections) + } + } + } +} + +func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) api.Endpoints { + ept := api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + eptFunc(&ept) + return ept +} + +func makeServicePortName(ns, name, port string) proxy.ServicePortName { + return proxy.ServicePortName{ + NamespacedName: types.NamespacedName{ + Namespace: ns, + Name: name, + }, + Port: port, + } +} + // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces. From 48ea3047110abc0b532f3178e072ece8c7504903 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Thu, 2 Feb 2017 17:24:38 -0800 Subject: [PATCH 4/9] Sanitize newEndpoints semantics, remove a dup arg --- pkg/proxy/iptables/proxier.go | 12 +++---- pkg/proxy/iptables/proxier_test.go | 58 ++++++++---------------------- 2 files changed, 20 insertions(+), 50 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 5b27d6644f6..deba2cc6f83 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -588,7 +588,6 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { defer proxier.mu.Unlock() proxier.haveReceivedEndpointsUpdate = true - activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set staleConnections := make(map[endpointServicePair]bool) svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo) newEndpointsMap := make(map[proxy.ServicePortName][]*endpointsInfo) @@ -596,14 +595,14 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { // Update endpoints for services. for i := range allEndpoints { accumulateEndpointsMap(&allEndpoints[i], proxier.hostname, proxier.endpointsMap, &newEndpointsMap, - &svcPortToInfoMap, &staleConnections, &activeEndpoints) + &svcPortToInfoMap, &staleConnections) } // Check stale connections against endpoints missing from the update. // TODO: we should really only mark a connection stale if the proto was UDP // and the (ip, port, proto) was removed from the endpoints. for svcPort := range proxier.endpointsMap { - if !activeEndpoints[svcPort] { - glog.V(2).Infof("Removing endpoints for %q", svcPort) + if _, found := newEndpointsMap[svcPort]; !found { + glog.V(3).Infof("Removing endpoints for %q", svcPort) // record endpoints of unactive service to stale connections for _, ep := range proxier.endpointsMap[svcPort] { staleConnections[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true @@ -640,8 +639,7 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, curEndpoints map[proxy.ServicePortName][]*endpointsInfo, newEndpoints *map[proxy.ServicePortName][]*endpointsInfo, svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo, - staleConnections *map[endpointServicePair]bool, - activeEndpoints *map[proxy.ServicePortName]bool) { + staleConnections *map[endpointServicePair]bool) { // We need to build a map of portname -> all ip:ports for that // portname. Explode Endpoints.Subsets[*] into this structure. @@ -678,9 +676,9 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, (*staleConnections)[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true } } + glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEPList) // Once the set operations using the list of ips are complete, build the list of endpoint infos (*newEndpoints)[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEPList) - (*activeEndpoints)[svcPort] = true } } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index e763b95d36f..194cae464fb 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -1177,18 +1177,16 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // This is a coarse test, but it offers some modicum of confidence as the code is evolved. func Test_accumulateEndpointsMap(t *testing.T) { testCases := []struct { - newEndpoints api.Endpoints - oldEndpoints map[proxy.ServicePortName][]*endpointsInfo - expectedNew map[proxy.ServicePortName][]*endpointsInfo - expectedActive []proxy.ServicePortName - expectedStale []endpointServicePair + newEndpoints api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedNew map[proxy.ServicePortName][]*endpointsInfo + expectedStale []endpointServicePair }{{ // Case[0]: nothing - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedActive: []proxy.ServicePortName{}, - expectedStale: []endpointServicePair{}, + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedStale: []endpointServicePair{}, }, { // Case[1]: no changes, unnamed port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1214,8 +1212,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "")}, - expectedStale: []endpointServicePair{}, + expectedStale: []endpointServicePair{}, }, { // Case[2]: no changes, named port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1241,8 +1238,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "port")}, - expectedStale: []endpointServicePair{}, + expectedStale: []endpointServicePair{}, }, { // Case[3]: new port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1265,8 +1261,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "")}, - expectedStale: []endpointServicePair{}, + expectedStale: []endpointServicePair{}, }, { // Case[4]: remove port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), @@ -1275,9 +1270,8 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedActive: []proxy.ServicePortName{}, - expectedStale: []endpointServicePair{ /* can't detect this one */ }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedStale: []endpointServicePair{ /* can't detect this one */ }, }, { // Case[5]: new IP and port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1313,10 +1307,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"2.2.2.2:22", false}, }, }, - expectedActive: []proxy.ServicePortName{ - makeServicePortName("ns1", "ep1", "p1"), - makeServicePortName("ns1", "ep1", "p2"), - }, expectedStale: []endpointServicePair{}, }, { // Case[6]: remove IP and port @@ -1348,9 +1338,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedActive: []proxy.ServicePortName{ - makeServicePortName("ns1", "ep1", "p1"), - }, expectedStale: []endpointServicePair{{ endpoint: "2.2.2.2:11", servicePortName: makeServicePortName("ns1", "ep1", "p1"), @@ -1380,10 +1367,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedActive: []proxy.ServicePortName{ - makeServicePortName("ns1", "ep1", "p2"), - }, - expectedStale: []endpointServicePair{}, + expectedStale: []endpointServicePair{ /* can't detect this one */ }, }, { // Case[8]: renumber port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1409,9 +1393,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:22", false}, }, }, - expectedActive: []proxy.ServicePortName{ - makeServicePortName("ns1", "ep1", "p1"), - }, expectedStale: []endpointServicePair{{ endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", "p1"), @@ -1423,9 +1404,8 @@ func Test_accumulateEndpointsMap(t *testing.T) { newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{} svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{} staleConnections := map[endpointServicePair]bool{} - activeEndpoints := map[proxy.ServicePortName]bool{} accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, - &newEndpoints, &svcPortToInfoMap, &staleConnections, &activeEndpoints) + &newEndpoints, &svcPortToInfoMap, &staleConnections) if len(newEndpoints) != len(tc.expectedNew) { t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints)) @@ -1441,14 +1421,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { } } } - if len(activeEndpoints) != len(tc.expectedActive) { - t.Errorf("[%d] expected %d active, got %d: %v", tci, len(tc.expectedActive), len(activeEndpoints), activeEndpoints) - } - for _, x := range tc.expectedActive { - if activeEndpoints[x] != true { - t.Errorf("[%d] expected active[%v], but didn't find it: %v", tci, x, activeEndpoints) - } - } if len(staleConnections) != len(tc.expectedStale) { t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(staleConnections), staleConnections) } From 8d24fc398436abe2310d7a997da277545d2bb12b Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Thu, 2 Feb 2017 17:37:34 -0800 Subject: [PATCH 5/9] Simplify maps which had almost the exact same info --- pkg/proxy/iptables/proxier.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index deba2cc6f83..e915c3c808e 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -643,11 +643,14 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, // We need to build a map of portname -> all ip:ports for that // portname. Explode Endpoints.Subsets[*] into this structure. - portsToEndpoints := map[string][]hostPortInfo{} for i := range endpoints.Subsets { ss := &endpoints.Subsets[i] for i := range ss.Ports { port := &ss.Ports[i] + svcPort := proxy.ServicePortName{ + NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, + Port: port.Name, + } for i := range ss.Addresses { addr := &ss.Addresses[i] hostPortObject := hostPortInfo{ @@ -655,17 +658,13 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, port: int(port.Port), isLocal: addr.NodeName != nil && *addr.NodeName == hostname, } - portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject) + (*svcPortToInfoMap)[svcPort] = append((*svcPortToInfoMap)[svcPort], hostPortObject) } } } - for portname := range portsToEndpoints { - svcPort := proxy.ServicePortName{ - NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, - Port: portname, - } - (*svcPortToInfoMap)[svcPort] = portsToEndpoints[portname] - newEPList := flattenValidEndpoints(portsToEndpoints[portname]) + // Decompose the lists of endpoints into details of what was changed for the caller. + for svcPort, hostPortInfos := range *svcPortToInfoMap { + newEPList := flattenValidEndpoints(hostPortInfos) // Flatten the list of current endpoint infos to just a list of ips as strings curEndpointIPs := flattenEndpointsInfo(curEndpoints[svcPort]) if len(curEndpointIPs) != len(newEPList) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEPList) { @@ -678,7 +677,7 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, } glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEPList) // Once the set operations using the list of ips are complete, build the list of endpoint infos - (*newEndpoints)[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEPList) + (*newEndpoints)[svcPort] = buildEndpointInfoList(hostPortInfos, newEPList) } } From 6069d49d49f2f2477219c6028da5e5060d98458d Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Fri, 3 Feb 2017 16:05:09 -0800 Subject: [PATCH 6/9] Add tests for updateEndpoints --- pkg/proxy/iptables/proxier.go | 95 +++-- pkg/proxy/iptables/proxier_test.go | 623 +++++++++++++++++++++++++++++ 2 files changed, 679 insertions(+), 39 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index e915c3c808e..284e5370d1a 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -588,42 +588,9 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { defer proxier.mu.Unlock() proxier.haveReceivedEndpointsUpdate = true - staleConnections := make(map[endpointServicePair]bool) - svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo) - newEndpointsMap := make(map[proxy.ServicePortName][]*endpointsInfo) - - // Update endpoints for services. - for i := range allEndpoints { - accumulateEndpointsMap(&allEndpoints[i], proxier.hostname, proxier.endpointsMap, &newEndpointsMap, - &svcPortToInfoMap, &staleConnections) - } - // Check stale connections against endpoints missing from the update. - // TODO: we should really only mark a connection stale if the proto was UDP - // and the (ip, port, proto) was removed from the endpoints. - for svcPort := range proxier.endpointsMap { - if _, found := newEndpointsMap[svcPort]; !found { - glog.V(3).Infof("Removing endpoints for %q", svcPort) - // record endpoints of unactive service to stale connections - for _, ep := range proxier.endpointsMap[svcPort] { - staleConnections[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true - } - } - } - - // Update service health check - allSvcPorts := make(map[proxy.ServicePortName]bool) - for svcPort := range proxier.endpointsMap { - allSvcPorts[svcPort] = true - } - for svcPort := range newEndpointsMap { - allSvcPorts[svcPort] = true - } - for svcPort := range allSvcPorts { - proxier.updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort]) - } - - if len(newEndpointsMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newEndpointsMap, proxier.endpointsMap) { - proxier.endpointsMap = newEndpointsMap + newMap, staleConnections := updateEndpoints(allEndpoints, proxier.endpointsMap, proxier.hostname, updateHealthCheckEntries) + if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) { + proxier.endpointsMap = newMap proxier.syncProxyRules() } else { glog.V(4).Infof("Skipping proxy iptables rule sync on endpoint update because nothing changed") @@ -632,9 +599,59 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { proxier.deleteEndpointConnections(staleConnections) } +// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. +// TODO: the hcUpdater should be a method on an interface, but it is a global pkg for now +func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortName][]*endpointsInfo, hostname string, + hcUpdater func(types.NamespacedName, []hostPortInfo)) (newMap map[proxy.ServicePortName][]*endpointsInfo, stale map[endpointServicePair]bool) { + + // return values + newMap = make(map[proxy.ServicePortName][]*endpointsInfo) + stale = make(map[endpointServicePair]bool) + + // local + svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo) + + // Update endpoints for services. + for i := range allEndpoints { + accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap, &stale) + } + // Check stale connections against endpoints missing from the update. + // TODO: we should really only mark a connection stale if the proto was UDP + // and the (ip, port, proto) was removed from the endpoints. + for svcPort := range curMap { + if _, found := newMap[svcPort]; !found { + glog.V(3).Infof("Removing endpoints for %q", svcPort) + // record endpoints of unactive service to stale connections + for _, ep := range curMap[svcPort] { + stale[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true + } + } + } + + // Update service health check + allSvcPorts := make(map[proxy.ServicePortName]bool) + for svcPort := range curMap { + allSvcPorts[svcPort] = true + } + for svcPort := range newMap { + allSvcPorts[svcPort] = true + } + for svcPort := range allSvcPorts { + hcUpdater(svcPort.NamespacedName, svcPortToInfoMap[svcPort]) + } + + return newMap, stale +} + // Gather information about all the endpoint state for a given api.Endpoints. -// This can not detect all stale connections, so the caller should also check -// for entries that were totally removed. +// This can not report complete info on stale connections because it has limited +// scope - it only knows one Endpoints, but sees the whole current map. That +// cleanup has to be done above. +// +// TODO: this could be simplified: +// - hostPortInfo and endpointsInfo overlap too much +// - the test for this is overlapped by the test for updateEndpoints +// - naming is poor and responsibilities are muddled func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, curEndpoints map[proxy.ServicePortName][]*endpointsInfo, newEndpoints *map[proxy.ServicePortName][]*endpointsInfo, @@ -682,7 +699,7 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, } // updateHealthCheckEntries - send the new set of local endpoints to the health checker -func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) { +func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { return } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 194cae464fb..1ebd9af31eb 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -1453,4 +1453,627 @@ func makeServicePortName(ns, name, port string) proxy.ServicePortName { } } +func Test_updateEndpoints(t *testing.T) { + testCases := []struct { + newEndpoints []api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedResult map[proxy.ServicePortName][]*endpointsInfo + expectedStale []endpointServicePair + }{{ + // Case[0]: nothing + newEndpoints: []api.Endpoints{}, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedStale: []endpointServicePair{}, + }, { + // Case[1]: no change, unnamed port + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Port: 11, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[2]: no change, named port + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[3]: no change, multiple subsets + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p12", + Port: 12, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.2:12", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.2:12", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[4]: no change, multiple subsets, multiple ports + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }, { + Name: "p12", + Port: 12, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.3", + }}, + Ports: []api.EndpointPort{{ + Name: "p13", + Port: 13, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.1:12", false}, + }, + makeServicePortName("ns1", "ep1", "p13"): { + {"1.1.1.3:13", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.1:12", false}, + }, + makeServicePortName("ns1", "ep1", "p13"): { + {"1.1.1.3:13", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[5]: no change, multiple endpoints, subsets, IPs, and ports + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }, { + IP: "1.1.1.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }, { + Name: "p12", + Port: 12, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.3", + }, { + IP: "1.1.1.4", + }}, + Ports: []api.EndpointPort{{ + Name: "p13", + Port: 13, + }, { + Name: "p14", + Port: 14, + }}, + }} + }), + makeTestEndpoints("ns2", "ep2", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "2.2.2.1", + }, { + IP: "2.2.2.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p21", + Port: 21, + }, { + Name: "p22", + Port: 22, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + {"1.1.1.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.1:12", false}, + {"1.1.1.2:12", false}, + }, + makeServicePortName("ns1", "ep1", "p13"): { + {"1.1.1.3:13", false}, + {"1.1.1.4:13", false}, + }, + makeServicePortName("ns1", "ep1", "p14"): { + {"1.1.1.3:14", false}, + {"1.1.1.4:14", false}, + }, + makeServicePortName("ns2", "ep2", "p21"): { + {"2.2.2.1:21", false}, + {"2.2.2.2:21", false}, + }, + makeServicePortName("ns2", "ep2", "p22"): { + {"2.2.2.1:22", false}, + {"2.2.2.2:22", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + {"1.1.1.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.1:12", false}, + {"1.1.1.2:12", false}, + }, + makeServicePortName("ns1", "ep1", "p13"): { + {"1.1.1.3:13", false}, + {"1.1.1.4:13", false}, + }, + makeServicePortName("ns1", "ep1", "p14"): { + {"1.1.1.3:14", false}, + {"1.1.1.4:14", false}, + }, + makeServicePortName("ns2", "ep2", "p21"): { + {"2.2.2.1:21", false}, + {"2.2.2.2:21", false}, + }, + makeServicePortName("ns2", "ep2", "p22"): { + {"2.2.2.1:22", false}, + {"2.2.2.2:22", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[6]: add an Endpoints + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Port: 11, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ /* empty */ }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[7]: remove an Endpoints + newEndpoints: []api.Endpoints{ /* empty */ }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", ""): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedStale: []endpointServicePair{{ + endpoint: "1.1.1.1:11", + servicePortName: makeServicePortName("ns1", "ep1", ""), + }}, + }, { + // Case[8]: add an IP and port + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }, { + IP: "1.1.1.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }, { + Name: "p12", + Port: 12, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + {"1.1.1.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.1:12", false}, + {"1.1.1.2:12", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[9]: remove an IP and port + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + {"1.1.1.2:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.1:12", false}, + {"1.1.1.2:12", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedStale: []endpointServicePair{{ + endpoint: "1.1.1.2:11", + servicePortName: makeServicePortName("ns1", "ep1", "p11"), + }, { + endpoint: "1.1.1.1:12", + servicePortName: makeServicePortName("ns1", "ep1", "p12"), + }, { + endpoint: "1.1.1.2:12", + servicePortName: makeServicePortName("ns1", "ep1", "p12"), + }}, + }, { + // Case[10]: add a subset + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "2.2.2.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p22", + Port: 22, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns1", "ep1", "p22"): { + {"2.2.2.2:22", false}, + }, + }, + expectedStale: []endpointServicePair{}, + }, { + // Case[11]: remove a subset + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns1", "ep1", "p22"): { + {"2.2.2.2:22", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedStale: []endpointServicePair{{ + endpoint: "2.2.2.2:22", + servicePortName: makeServicePortName("ns1", "ep1", "p22"), + }}, + }, { + // Case[12]: rename a port + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11-2", + Port: 11, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11-2"): { + {"1.1.1.1:11", false}, + }, + }, + expectedStale: []endpointServicePair{{ + endpoint: "1.1.1.1:11", + servicePortName: makeServicePortName("ns1", "ep1", "p11"), + }}, + }, { + // Case[13]: renumber a port + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 22, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:22", false}, + }, + }, + expectedStale: []endpointServicePair{{ + endpoint: "1.1.1.1:11", + servicePortName: makeServicePortName("ns1", "ep1", "p11"), + }}, + }, { + // Case[14]: complex add and remove + newEndpoints: []api.Endpoints{ + makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.1", + }, { + IP: "1.1.1.11", + }}, + Ports: []api.EndpointPort{{ + Name: "p11", + Port: 11, + }}, + }, { + Addresses: []api.EndpointAddress{{ + IP: "1.1.1.2", + }}, + Ports: []api.EndpointPort{{ + Name: "p12", + Port: 12, + }, { + Name: "p122", + Port: 122, + }}, + }} + }), + makeTestEndpoints("ns3", "ep3", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "3.3.3.3", + }}, + Ports: []api.EndpointPort{{ + Name: "p33", + Port: 33, + }}, + }} + }), + makeTestEndpoints("ns4", "ep4", func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: "4.4.4.4", + }}, + Ports: []api.EndpointPort{{ + Name: "p44", + Port: 44, + }}, + }} + }), + }, + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + }, + makeServicePortName("ns2", "ep2", "p22"): { + {"2.2.2.2:22", false}, + {"2.2.2.22:22", false}, + }, + makeServicePortName("ns2", "ep2", "p23"): { + {"2.2.2.3:23", false}, + }, + makeServicePortName("ns4", "ep4", "p44"): { + {"4.4.4.4:44", false}, + {"4.4.4.5:44", false}, + }, + makeServicePortName("ns4", "ep4", "p45"): { + {"4.4.4.6:45", false}, + }, + }, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ + makeServicePortName("ns1", "ep1", "p11"): { + {"1.1.1.1:11", false}, + {"1.1.1.11:11", false}, + }, + makeServicePortName("ns1", "ep1", "p12"): { + {"1.1.1.2:12", false}, + }, + makeServicePortName("ns1", "ep1", "p122"): { + {"1.1.1.2:122", false}, + }, + makeServicePortName("ns3", "ep3", "p33"): { + {"3.3.3.3:33", false}, + }, + makeServicePortName("ns4", "ep4", "p44"): { + {"4.4.4.4:44", false}, + }, + }, + expectedStale: []endpointServicePair{{ + endpoint: "2.2.2.2:22", + servicePortName: makeServicePortName("ns2", "ep2", "p22"), + }, { + endpoint: "2.2.2.22:22", + servicePortName: makeServicePortName("ns2", "ep2", "p22"), + }, { + endpoint: "2.2.2.3:23", + servicePortName: makeServicePortName("ns2", "ep2", "p23"), + }, { + endpoint: "4.4.4.5:44", + servicePortName: makeServicePortName("ns4", "ep4", "p44"), + }, { + endpoint: "4.4.4.6:45", + servicePortName: makeServicePortName("ns4", "ep4", "p45"), + }}, + }} + + for tci, tc := range testCases { + newMap, stale := updateEndpoints(tc.newEndpoints, tc.oldEndpoints, "host", func(types.NamespacedName, []hostPortInfo) {}) + if len(newMap) != len(tc.expectedResult) { + t.Errorf("[%d] expected %d results, got %d: %v", tci, len(tc.expectedResult), len(newMap), newMap) + } + for x := range tc.expectedResult { + if len(newMap[x]) != len(tc.expectedResult[x]) { + t.Errorf("[%d] expected %d endpoints for %v, got %d", tci, len(tc.expectedResult[x]), x, len(newMap[x])) + } else { + for i := range tc.expectedResult[x] { + if *(newMap[x][i]) != *(tc.expectedResult[x][i]) { + t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, tc.expectedResult[x][i], newMap[x][i]) + } + } + } + } + if len(stale) != len(tc.expectedStale) { + t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(stale), stale) + } + for _, x := range tc.expectedStale { + if stale[x] != true { + t.Errorf("[%d] expected stale[%v], but didn't find it: %v", tci, x, stale) + } + } + } +} + // TODO(thockin): add *more* tests for syncProxyRules() or break it down further and test the pieces. From cddda17d42c333fbb0749d8fc7c4a22f64b720be Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Fri, 3 Feb 2017 18:45:52 -0800 Subject: [PATCH 7/9] Make healthcheck an interface --- pkg/proxy/iptables/BUILD | 1 + pkg/proxy/iptables/proxier.go | 25 +++++++++++++++++++------ pkg/proxy/iptables/proxier_test.go | 8 +++++++- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 0df7cc5a7d4..b1cfcd32fd8 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -50,6 +50,7 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/intstr", + "//vendor:k8s.io/apimachinery/pkg/util/sets", ], ) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 284e5370d1a..d0cac007516 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -211,6 +211,7 @@ type Proxier struct { nodeIP net.IP portMapper portOpener recorder record.EventRecorder + healthChecker healthChecker } type localPort struct { @@ -242,6 +243,17 @@ func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) { return openLocalPort(lp) } +type healthChecker interface { + UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) +} + +// TODO: the healthcheck pkg should offer a type +type globalHealthChecker struct{} + +func (globalHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) { + healthcheck.UpdateEndpoints(serviceName, endpointUIDs) +} + // Proxier implements ProxyProvider var _ proxy.ProxyProvider = &Proxier{} @@ -295,6 +307,7 @@ func NewProxier(ipt utiliptables.Interface, glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") } + healthChecker := globalHealthChecker{} go healthcheck.Run() var throttle flowcontrol.RateLimiter @@ -321,6 +334,7 @@ func NewProxier(ipt utiliptables.Interface, nodeIP: nodeIP, portMapper: &listenPortOpener{}, recorder: recorder, + healthChecker: healthChecker, }, nil } @@ -588,7 +602,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { defer proxier.mu.Unlock() proxier.haveReceivedEndpointsUpdate = true - newMap, staleConnections := updateEndpoints(allEndpoints, proxier.endpointsMap, proxier.hostname, updateHealthCheckEntries) + newMap, staleConnections := updateEndpoints(allEndpoints, proxier.endpointsMap, proxier.hostname, proxier.healthChecker) if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) { proxier.endpointsMap = newMap proxier.syncProxyRules() @@ -600,9 +614,8 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { } // Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. -// TODO: the hcUpdater should be a method on an interface, but it is a global pkg for now func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortName][]*endpointsInfo, hostname string, - hcUpdater func(types.NamespacedName, []hostPortInfo)) (newMap map[proxy.ServicePortName][]*endpointsInfo, stale map[endpointServicePair]bool) { + healthChecker healthChecker) (newMap map[proxy.ServicePortName][]*endpointsInfo, stale map[endpointServicePair]bool) { // return values newMap = make(map[proxy.ServicePortName][]*endpointsInfo) @@ -637,7 +650,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN allSvcPorts[svcPort] = true } for svcPort := range allSvcPorts { - hcUpdater(svcPort.NamespacedName, svcPortToInfoMap[svcPort]) + updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort], healthChecker) } return newMap, stale @@ -699,7 +712,7 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, } // updateHealthCheckEntries - send the new set of local endpoints to the health checker -func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) { +func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo, healthChecker healthChecker) { if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) { return } @@ -712,7 +725,7 @@ func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInf endpoints.Insert(fmt.Sprintf("%s/%s", name.Namespace, name.Name)) } } - healthcheck.UpdateEndpoints(name, endpoints) + healthChecker.UpdateEndpoints(name, endpoints) } // used in OnEndpointsUpdate diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 1ebd9af31eb..48e87c10093 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -28,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/proxy" @@ -493,6 +494,10 @@ func (f *fakePortOpener) OpenLocalPort(lp *localPort) (closeable, error) { return nil, nil } +type fakeHealthChecker struct{} + +func (fakeHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) {} + func NewFakeProxier(ipt utiliptables.Interface) *Proxier { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. @@ -507,6 +512,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { hostname: "test-hostname", portsMap: make(map[localPort]closeable), portMapper: &fakePortOpener{[]*localPort{}}, + healthChecker: fakeHealthChecker{}, } } @@ -2050,7 +2056,7 @@ func Test_updateEndpoints(t *testing.T) { }} for tci, tc := range testCases { - newMap, stale := updateEndpoints(tc.newEndpoints, tc.oldEndpoints, "host", func(types.NamespacedName, []hostPortInfo) {}) + newMap, stale := updateEndpoints(tc.newEndpoints, tc.oldEndpoints, "host", fakeHealthChecker{}) if len(newMap) != len(tc.expectedResult) { t.Errorf("[%d] expected %d results, got %d: %v", tci, len(tc.expectedResult), len(newMap), newMap) } From 7046c7efcbaf4fd8aa97d5a095cf14b5a789ef38 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Fri, 3 Feb 2017 18:03:45 -0800 Subject: [PATCH 8/9] Prep to move guts of OnEnpointsUpdate to sync This makes it more obvious that they run together and makes the upcoming rate-limited syncs easier. Also make test use ints for ports, so it's easier to see when a port is a literal value vs a name. --- pkg/proxy/iptables/proxier.go | 33 +++-- pkg/proxy/iptables/proxier_test.go | 215 ++++++++++++++++++++--------- 2 files changed, 168 insertions(+), 80 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index d0cac007516..58f443ad6ae 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -191,13 +191,13 @@ type proxyServiceMap map[proxy.ServicePortName]*serviceInfo // Proxier is an iptables based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { - mu sync.Mutex // protects the following fields - serviceMap proxyServiceMap - endpointsMap map[proxy.ServicePortName][]*endpointsInfo - portsMap map[localPort]closeable - haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event - haveReceivedEndpointsUpdate bool // true once we've seen an OnEndpointsUpdate event - throttle flowcontrol.RateLimiter + mu sync.Mutex // protects the following fields + serviceMap proxyServiceMap + endpointsMap map[proxy.ServicePortName][]*endpointsInfo + portsMap map[localPort]closeable + haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event + allEndpoints []api.Endpoints // nil until we have seen an OnEndpointsUpdate event + throttle flowcontrol.RateLimiter // These are effectively const and do not need the mutex to be held. syncPeriod time.Duration @@ -593,16 +593,15 @@ func buildEndpointInfoList(endPoints []hostPortInfo, endpointIPs []string) []*en // OnEndpointsUpdate takes in a slice of updated endpoints. func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { - start := time.Now() - defer func() { - glog.V(4).Infof("OnEndpointsUpdate took %v for %d endpoints", time.Since(start), len(allEndpoints)) - }() - proxier.mu.Lock() defer proxier.mu.Unlock() - proxier.haveReceivedEndpointsUpdate = true + if proxier.allEndpoints == nil { + glog.V(2).Info("Received first Endpoints update") + } + proxier.allEndpoints = allEndpoints - newMap, staleConnections := updateEndpoints(allEndpoints, proxier.endpointsMap, proxier.hostname, proxier.healthChecker) + // TODO: once service has made this same transform, move this into proxier.syncProxyRules() + newMap, staleConnections := updateEndpoints(proxier.allEndpoints, proxier.endpointsMap, proxier.hostname, proxier.healthChecker) if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) { proxier.endpointsMap = newMap proxier.syncProxyRules() @@ -874,7 +873,7 @@ func (proxier *Proxier) syncProxyRules() { glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) }() // don't sync rules till we've received services and endpoints - if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate { + if proxier.allEndpoints == nil || !proxier.haveReceivedServiceUpdate { glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") return } @@ -923,6 +922,10 @@ func (proxier *Proxier) syncProxyRules() { } } + // + // Below this point we will not return until we try to write the iptables rules. + // + // Get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore existingFilterChains := make(map[utiliptables.Chain]string) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 48e87c10093..b46c8b0e499 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -17,6 +17,7 @@ limitations under the License. package iptables import ( + "strconv" "testing" "github.com/davecgh/go-spew/spew" @@ -292,8 +293,8 @@ func TestDeleteEndpointConnections(t *testing.T) { } serviceMap := make(map[proxy.ServicePortName]*serviceInfo) - svc1 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc1"}, Port: "80"} - svc2 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc2"}, Port: "80"} + svc1 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc1"}, Port: "p80"} + svc2 := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: "svc2"}, Port: "p80"} serviceMap[svc1] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 40), 80, api.ProtocolUDP, false) serviceMap[svc2] = newFakeServiceInfo(svc1, net.IPv4(10, 20, 30, 41), 80, api.ProtocolTCP, false) @@ -498,37 +499,39 @@ type fakeHealthChecker struct{} func (fakeHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) {} +const testHostname = "test-hostname" + func NewFakeProxier(ipt utiliptables.Interface) *Proxier { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. return &Proxier{ - exec: &exec.FakeExec{}, - serviceMap: make(map[proxy.ServicePortName]*serviceInfo), - iptables: ipt, - endpointsMap: make(map[proxy.ServicePortName][]*endpointsInfo), - clusterCIDR: "10.0.0.0/24", - haveReceivedEndpointsUpdate: true, - haveReceivedServiceUpdate: true, - hostname: "test-hostname", - portsMap: make(map[localPort]closeable), - portMapper: &fakePortOpener{[]*localPort{}}, - healthChecker: fakeHealthChecker{}, + exec: &exec.FakeExec{}, + serviceMap: make(map[proxy.ServicePortName]*serviceInfo), + iptables: ipt, + clusterCIDR: "10.0.0.0/24", + allEndpoints: []api.Endpoints{}, + haveReceivedServiceUpdate: true, + hostname: testHostname, + portsMap: make(map[localPort]closeable), + portMapper: &fakePortOpener{[]*localPort{}}, + healthChecker: fakeHealthChecker{}, } } -func hasJump(rules []iptablestest.Rule, destChain, destIP, destPort string) bool { +func hasJump(rules []iptablestest.Rule, destChain, destIP string, destPort int) bool { + destPortStr := strconv.Itoa(destPort) match := false for _, r := range rules { if r[iptablestest.Jump] == destChain { match = true if destIP != "" { - if strings.Contains(r[iptablestest.Destination], destIP) && (strings.Contains(r[iptablestest.DPort], destPort) || r[iptablestest.DPort] == "") { + if strings.Contains(r[iptablestest.Destination], destIP) && (strings.Contains(r[iptablestest.DPort], destPortStr) || r[iptablestest.DPort] == "") { return true } match = false } - if destPort != "" { - if strings.Contains(r[iptablestest.DPort], destPort) && (strings.Contains(r[iptablestest.Destination], destIP) || r[iptablestest.Destination] == "") { + if destPort != 0 { + if strings.Contains(r[iptablestest.DPort], destPortStr) && (strings.Contains(r[iptablestest.Destination], destIP) || r[iptablestest.Destination] == "") { return true } match = false @@ -543,7 +546,7 @@ func TestHasJump(t *testing.T) { rules []iptablestest.Rule destChain string destIP string - destPort string + destPort int expected bool }{ "case 1": { @@ -554,7 +557,7 @@ func TestHasJump(t *testing.T) { }, destChain: "REJECT", destIP: "10.20.30.41", - destPort: "80", + destPort: 80, expected: true, }, "case 2": { @@ -565,7 +568,7 @@ func TestHasJump(t *testing.T) { }, destChain: "REJECT", destIP: "", - destPort: "3001", + destPort: 3001, expected: true, }, "case 3": { @@ -575,7 +578,7 @@ func TestHasJump(t *testing.T) { }, destChain: "KUBE-XLB-GF53O3C2HZEXL2XN", destIP: "1.2.3.4", - destPort: "80", + destPort: 80, expected: true, }, "case 4": { @@ -585,7 +588,7 @@ func TestHasJump(t *testing.T) { }, destChain: "KUBE-XLB-GF53O3C2HZEXL2XN", destIP: "1.2.3.4", - destPort: "8080", + destPort: 8080, expected: false, }, "case 5": { @@ -595,7 +598,7 @@ func TestHasJump(t *testing.T) { }, destChain: "KUBE-XLB-GF53O3C2HZEXL2XN", destIP: "10.20.30.40", - destPort: "80", + destPort: 80, expected: false, }, "case 6": { @@ -607,7 +610,7 @@ func TestHasJump(t *testing.T) { }, destChain: "REJECT", destIP: "1.2.3.4", - destPort: "8080", + destPort: 8080, expected: true, }, "case 7": { @@ -618,7 +621,7 @@ func TestHasJump(t *testing.T) { }, destChain: "REJECT", destIP: "1.2.3.4", - destPort: "3001", + destPort: 3001, expected: true, }, "case 8": { @@ -629,7 +632,7 @@ func TestHasJump(t *testing.T) { }, destChain: "REJECT", destIP: "10.20.30.41", - destPort: "8080", + destPort: 8080, expected: true, }, "case 9": { @@ -638,7 +641,7 @@ func TestHasJump(t *testing.T) { }, destChain: "KUBE-SEP-LWSOSDSHMKPJHHJV", destIP: "", - destPort: "", + destPort: 0, expected: true, }, "case 10": { @@ -647,7 +650,7 @@ func TestHasJump(t *testing.T) { }, destChain: "KUBE-SEP-BAR", destIP: "", - destPort: "", + destPort: 0, expected: false, }, } @@ -681,7 +684,7 @@ func TestClusterIPReject(t *testing.T) { svcName := "svc1" svcIP := net.IPv4(10, 20, 30, 41) - svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"} + svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"} fp.serviceMap[svc] = newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, false) fp.syncProxyRules() @@ -691,7 +694,7 @@ func TestClusterIPReject(t *testing.T) { errorf(fmt.Sprintf("Unexpected rule for chain %v service %v without endpoints", svcChain, svcName), svcRules, t) } kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) - if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP.String(), "80") { + if !hasJump(kubeSvcRules, iptablestest.Reject, svcIP.String(), 80) { errorf(fmt.Sprintf("Failed to find a %v rule for service %v with no endpoints", iptablestest.Reject, svcName), kubeSvcRules, t) } } @@ -702,23 +705,37 @@ func TestClusterIPEndpointsJump(t *testing.T) { svcName := "svc1" svcIP := net.IPv4(10, 20, 30, 41) - svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"} + svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"} fp.serviceMap[svc] = newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, true) - ep := "10.180.0.1:80" - fp.endpointsMap[svc] = []*endpointsInfo{{ep, false}} + ip := "10.180.0.1" + port := 80 + ep := fmt.Sprintf("%s:%d", ip, port) + allEndpoints := []api.Endpoints{ + makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: ip, + }}, + Ports: []api.EndpointPort{{ + Name: "p80", + Port: int32(port), + }}, + }} + }), + } - fp.syncProxyRules() + fp.OnEndpointsUpdate(allEndpoints) svcChain := string(servicePortChainName(svc, strings.ToLower(string(api.ProtocolTCP)))) epChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), ep)) kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) - if !hasJump(kubeSvcRules, svcChain, svcIP.String(), "80") { + if !hasJump(kubeSvcRules, svcChain, svcIP.String(), 80) { errorf(fmt.Sprintf("Failed to find jump from KUBE-SERVICES to %v chain", svcChain), kubeSvcRules, t) } svcRules := ipt.GetRules(svcChain) - if !hasJump(svcRules, epChain, "", "") { + if !hasJump(svcRules, epChain, "", 0) { errorf(fmt.Sprintf("Failed to jump to ep chain %v", epChain), svcRules, t) } epRules := ipt.GetRules(epChain) @@ -741,12 +758,25 @@ func TestLoadBalancer(t *testing.T) { svcName := "svc1" svcIP := net.IPv4(10, 20, 30, 41) - svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"} + svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"} svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, false) fp.serviceMap[svc] = typeLoadBalancer(svcInfo) - ep1 := "10.180.0.1:80" - fp.endpointsMap[svc] = []*endpointsInfo{{ep1, false}} + ip := "10.180.0.1" + port := 80 + fp.allEndpoints = []api.Endpoints{ + makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: ip, + }}, + Ports: []api.EndpointPort{{ + Name: "p80", + Port: int32(port), + }}, + }} + }), + } fp.syncProxyRules() @@ -756,12 +786,12 @@ func TestLoadBalancer(t *testing.T) { //lbChain := string(serviceLBChainName(svc, proto)) kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) - if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, "80") { + if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, 80) { errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t) } fwRules := ipt.GetRules(fwChain) - if !hasJump(fwRules, svcChain, "", "") || !hasJump(fwRules, string(KubeMarkMasqChain), "", "") { + if !hasJump(fwRules, svcChain, "", 0) || !hasJump(fwRules, string(KubeMarkMasqChain), "", 0) { errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, svcChain), fwRules, t) } } @@ -772,13 +802,26 @@ func TestNodePort(t *testing.T) { svcName := "svc1" svcIP := net.IPv4(10, 20, 30, 41) - svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"} + svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"} svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, false) svcInfo.nodePort = 3001 fp.serviceMap[svc] = svcInfo - ep1 := "10.180.0.1:80" - fp.endpointsMap[svc] = []*endpointsInfo{{ep1, false}} + ip := "10.180.0.1" + port := 80 + fp.allEndpoints = []api.Endpoints{ + makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: ip, + }}, + Ports: []api.EndpointPort{{ + Name: "p80", + Port: int32(port), + }}, + }} + }), + } fp.syncProxyRules() @@ -786,26 +829,49 @@ func TestNodePort(t *testing.T) { svcChain := string(servicePortChainName(svc, strings.ToLower(proto))) kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain)) - if !hasJump(kubeNodePortRules, svcChain, "", fmt.Sprintf("%v", svcInfo.nodePort)) { + if !hasJump(kubeNodePortRules, svcChain, "", svcInfo.nodePort) { errorf(fmt.Sprintf("Failed to find jump to svc chain %v", svcChain), kubeNodePortRules, t) } } +func strPtr(s string) *string { + return &s +} + func TestOnlyLocalLoadBalancing(t *testing.T) { ipt := iptablestest.NewFake() fp := NewFakeProxier(ipt) svcName := "svc1" svcIP := net.IPv4(10, 20, 30, 41) - svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"} + svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"} svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, true) fp.serviceMap[svc] = typeLoadBalancer(svcInfo) - nonLocalEp := "10.180.0.1:80" - localEp := "10.180.2.1:80" - fp.endpointsMap[svc] = []*endpointsInfo{{nonLocalEp, false}, {localEp, true}} + ip1 := "10.180.0.1" + ip2 := "10.180.2.1" + port := 80 + nonLocalEp := fmt.Sprintf("%s:%d", ip1, port) + localEp := fmt.Sprintf("%s:%d", ip2, port) + allEndpoints := []api.Endpoints{ + makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: ip1, + NodeName: nil, + }, { + IP: ip2, + NodeName: strPtr(testHostname), + }}, + Ports: []api.EndpointPort{{ + Name: "p80", + Port: int32(port), + }}, + }} + }), + } - fp.syncProxyRules() + fp.OnEndpointsUpdate(allEndpoints) proto := strings.ToLower(string(api.ProtocolTCP)) fwChain := string(serviceFirewallChainName(svc, proto)) @@ -815,24 +881,24 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { localEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), localEp)) kubeSvcRules := ipt.GetRules(string(kubeServicesChain)) - if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, "") { + if !hasJump(kubeSvcRules, fwChain, svcInfo.loadBalancerStatus.Ingress[0].IP, 0) { errorf(fmt.Sprintf("Failed to find jump to firewall chain %v", fwChain), kubeSvcRules, t) } fwRules := ipt.GetRules(fwChain) - if !hasJump(fwRules, lbChain, "", "") { + if !hasJump(fwRules, lbChain, "", 0) { errorf(fmt.Sprintf("Failed to find jump from firewall chain %v to svc chain %v", fwChain, lbChain), fwRules, t) } - if hasJump(fwRules, string(KubeMarkMasqChain), "", "") { + if hasJump(fwRules, string(KubeMarkMasqChain), "", 0) { errorf(fmt.Sprintf("Found jump from fw chain %v to MASQUERADE", fwChain), fwRules, t) } lbRules := ipt.GetRules(lbChain) - if hasJump(lbRules, nonLocalEpChain, "", "") { + if hasJump(lbRules, nonLocalEpChain, "", 0) { errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, nonLocalEp), lbRules, t) } - if !hasJump(lbRules, localEpChain, "", "") { - errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, nonLocalEp), lbRules, t) + if !hasJump(lbRules, localEpChain, "", 0) { + errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, localEp), lbRules, t) } } @@ -855,16 +921,35 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable svcName := "svc1" svcIP := net.IPv4(10, 20, 30, 41) - svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "80"} + svc := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "ns1", Name: svcName}, Port: "p80"} svcInfo := newFakeServiceInfo(svc, svcIP, 80, api.ProtocolTCP, true) svcInfo.nodePort = 3001 fp.serviceMap[svc] = svcInfo - nonLocalEp := "10.180.0.1:80" - localEp := "10.180.2.1:80" - fp.endpointsMap[svc] = []*endpointsInfo{{nonLocalEp, false}, {localEp, true}} + ip1 := "10.180.0.1" + ip2 := "10.180.2.1" + port := 80 + nonLocalEp := fmt.Sprintf("%s:%d", ip1, port) + localEp := fmt.Sprintf("%s:%d", ip2, port) + allEndpoints := []api.Endpoints{ + makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { + ept.Subsets = []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{ + IP: ip1, + NodeName: nil, + }, { + IP: ip2, + NodeName: strPtr(testHostname), + }}, + Ports: []api.EndpointPort{{ + Name: "p80", + Port: int32(port), + }}, + }} + }), + } - fp.syncProxyRules() + fp.OnEndpointsUpdate(allEndpoints) proto := strings.ToLower(string(api.ProtocolTCP)) lbChain := string(serviceLBChainName(svc, proto)) @@ -873,23 +958,23 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable localEpChain := string(servicePortEndpointChainName(svc, strings.ToLower(string(api.ProtocolTCP)), localEp)) kubeNodePortRules := ipt.GetRules(string(kubeNodePortsChain)) - if !hasJump(kubeNodePortRules, lbChain, "", fmt.Sprintf("%v", svcInfo.nodePort)) { + if !hasJump(kubeNodePortRules, lbChain, "", svcInfo.nodePort) { errorf(fmt.Sprintf("Failed to find jump to lb chain %v", lbChain), kubeNodePortRules, t) } svcChain := string(servicePortChainName(svc, strings.ToLower(string(api.ProtocolTCP)))) lbRules := ipt.GetRules(lbChain) - if hasJump(lbRules, nonLocalEpChain, "", "") { + if hasJump(lbRules, nonLocalEpChain, "", 0) { errorf(fmt.Sprintf("Found jump from lb chain %v to non-local ep %v", lbChain, nonLocalEp), lbRules, t) } - if hasJump(lbRules, svcChain, "", "") != shouldLBTOSVCRuleExist { + if hasJump(lbRules, svcChain, "", 0) != shouldLBTOSVCRuleExist { prefix := "Did not find " if !shouldLBTOSVCRuleExist { prefix = "Found " } errorf(fmt.Sprintf("%s jump from lb chain %v to svc %v", prefix, lbChain, svcChain), lbRules, t) } - if !hasJump(lbRules, localEpChain, "", "") { + if !hasJump(lbRules, localEpChain, "", 0) { errorf(fmt.Sprintf("Didn't find jump from lb chain %v to local ep %v", lbChain, nonLocalEp), lbRules, t) } } From 1ce3395e7f201c230de92d98ac7bca8d56234836 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Wed, 15 Feb 2017 10:36:01 -0800 Subject: [PATCH 9/9] Simplify stale-connection detection in kube-proxy --- pkg/proxy/iptables/BUILD | 1 - pkg/proxy/iptables/proxier.go | 65 ++++++----------------- pkg/proxy/iptables/proxier_test.go | 83 +++--------------------------- 3 files changed, 25 insertions(+), 124 deletions(-) diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index b1cfcd32fd8..821b14da8a3 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -20,7 +20,6 @@ go_library( "//pkg/proxy/healthcheck:go_default_library", "//pkg/util/exec:go_default_library", "//pkg/util/iptables:go_default_library", - "//pkg/util/slice:go_default_library", "//pkg/util/sysctl:go_default_library", "//pkg/util/version:go_default_library", "//vendor:github.com/davecgh/go-spew/spew", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 58f443ad6ae..6208dc00128 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -48,7 +48,6 @@ import ( "k8s.io/kubernetes/pkg/proxy/healthcheck" utilexec "k8s.io/kubernetes/pkg/util/exec" utiliptables "k8s.io/kubernetes/pkg/util/iptables" - "k8s.io/kubernetes/pkg/util/slice" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" utilversion "k8s.io/kubernetes/pkg/util/version" ) @@ -556,15 +555,6 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { proxier.deleteServiceConnections(staleUDPServices.List()) } -// Generate a list of ip strings from the list of endpoint infos -func flattenEndpointsInfo(endPoints []*endpointsInfo) []string { - var endpointIPs []string - for _, ep := range endPoints { - endpointIPs = append(endpointIPs, ep.endpoint) - } - return endpointIPs -} - // Reconstruct the list of endpoint infos from the endpointIP list // Use the slice of endpointIPs to rebuild a slice of corresponding {endpointIP, localEndpointOnly} infos // from the full []hostPortInfo slice. @@ -614,28 +604,34 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { // Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortName][]*endpointsInfo, hostname string, - healthChecker healthChecker) (newMap map[proxy.ServicePortName][]*endpointsInfo, stale map[endpointServicePair]bool) { + healthChecker healthChecker) (newMap map[proxy.ServicePortName][]*endpointsInfo, staleSet map[endpointServicePair]bool) { // return values newMap = make(map[proxy.ServicePortName][]*endpointsInfo) - stale = make(map[endpointServicePair]bool) + staleSet = make(map[endpointServicePair]bool) // local svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo) // Update endpoints for services. for i := range allEndpoints { - accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap, &stale) + accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap) } // Check stale connections against endpoints missing from the update. // TODO: we should really only mark a connection stale if the proto was UDP // and the (ip, port, proto) was removed from the endpoints. - for svcPort := range curMap { - if _, found := newMap[svcPort]; !found { - glog.V(3).Infof("Removing endpoints for %q", svcPort) - // record endpoints of unactive service to stale connections - for _, ep := range curMap[svcPort] { - stale[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true + for svcPort, epList := range curMap { + for _, ep := range epList { + stale := true + for i := range newMap[svcPort] { + if *newMap[svcPort][i] == *ep { + stale = false + break + } + } + if stale { + glog.V(4).Infof("Stale endpoint %v -> %v", svcPort, ep.endpoint) + staleSet[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true } } } @@ -652,7 +648,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort], healthChecker) } - return newMap, stale + return newMap, staleSet } // Gather information about all the endpoint state for a given api.Endpoints. @@ -667,8 +663,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortN func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, curEndpoints map[proxy.ServicePortName][]*endpointsInfo, newEndpoints *map[proxy.ServicePortName][]*endpointsInfo, - svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo, - staleConnections *map[endpointServicePair]bool) { + svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo) { // We need to build a map of portname -> all ip:ports for that // portname. Explode Endpoints.Subsets[*] into this structure. @@ -694,16 +689,6 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, // Decompose the lists of endpoints into details of what was changed for the caller. for svcPort, hostPortInfos := range *svcPortToInfoMap { newEPList := flattenValidEndpoints(hostPortInfos) - // Flatten the list of current endpoint infos to just a list of ips as strings - curEndpointIPs := flattenEndpointsInfo(curEndpoints[svcPort]) - if len(curEndpointIPs) != len(newEPList) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEPList) { - glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints) - // Gather stale connections to removed endpoints - removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEPList) - for _, ep := range removedEndpoints { - (*staleConnections)[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true - } - } glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEPList) // Once the set operations using the list of ips are complete, build the list of endpoint infos (*newEndpoints)[svcPort] = buildEndpointInfoList(hostPortInfos, newEPList) @@ -738,17 +723,6 @@ func isValidEndpoint(hpp *hostPortInfo) bool { return hpp.host != "" && hpp.port > 0 } -// Tests whether two slices are equivalent. This sorts both slices in-place. -func slicesEquiv(lhs, rhs []string) bool { - if len(lhs) != len(rhs) { - return false - } - if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) { - return true - } - return false -} - func flattenValidEndpoints(endpoints []hostPortInfo) []string { // Convert Endpoint objects into strings for easier use later. var result []string @@ -803,11 +777,6 @@ func servicePortEndpointChainName(s proxy.ServicePortName, protocol string, endp return utiliptables.Chain("KUBE-SEP-" + encoded[:16]) } -// getRemovedEndpoints returns the endpoint IPs that are missing in the new endpoints -func getRemovedEndpoints(curEndpoints, newEndpoints []string) []string { - return sets.NewString(curEndpoints...).Difference(sets.NewString(newEndpoints...)).List() -} - type endpointServicePair struct { endpoint string servicePortName proxy.ServicePortName diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index b46c8b0e499..7fbc27c1d42 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -172,47 +172,6 @@ func TestGetChainLinesMultipleTables(t *testing.T) { checkAllLines(t, utiliptables.TableNAT, []byte(iptables_save), expected) } -func TestGetRemovedEndpoints(t *testing.T) { - testCases := []struct { - currentEndpoints []string - newEndpoints []string - removedEndpoints []string - }{ - { - currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - removedEndpoints: []string{}, - }, - { - currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.3:80"}, - newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - removedEndpoints: []string{"10.0.2.3:80"}, - }, - { - currentEndpoints: []string{}, - newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - removedEndpoints: []string{}, - }, - { - currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - newEndpoints: []string{}, - removedEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - }, - { - currentEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80", "10.0.2.2:443"}, - newEndpoints: []string{"10.0.2.1:80", "10.0.2.2:80"}, - removedEndpoints: []string{"10.0.2.2:443"}, - }, - } - - for i := range testCases { - res := getRemovedEndpoints(testCases[i].currentEndpoints, testCases[i].newEndpoints) - if !slicesEquiv(res, testCases[i].removedEndpoints) { - t.Errorf("Expected: %v, but getRemovedEndpoints returned: %v", testCases[i].removedEndpoints, res) - } - } -} - func TestExecConntrackTool(t *testing.T) { fcmd := exec.FakeCmd{ CombinedOutputScript: []exec.FakeCombinedOutputAction{ @@ -1268,16 +1227,14 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // This is a coarse test, but it offers some modicum of confidence as the code is evolved. func Test_accumulateEndpointsMap(t *testing.T) { testCases := []struct { - newEndpoints api.Endpoints - oldEndpoints map[proxy.ServicePortName][]*endpointsInfo - expectedNew map[proxy.ServicePortName][]*endpointsInfo - expectedStale []endpointServicePair + newEndpoints api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedNew map[proxy.ServicePortName][]*endpointsInfo }{{ // Case[0]: nothing - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStale: []endpointServicePair{}, + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, }, { // Case[1]: no changes, unnamed port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1303,7 +1260,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedStale: []endpointServicePair{}, }, { // Case[2]: no changes, named port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1329,7 +1285,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedStale: []endpointServicePair{}, }, { // Case[3]: new port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1352,7 +1307,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedStale: []endpointServicePair{}, }, { // Case[4]: remove port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), @@ -1361,8 +1315,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStale: []endpointServicePair{ /* can't detect this one */ }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, }, { // Case[5]: new IP and port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1398,7 +1351,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"2.2.2.2:22", false}, }, }, - expectedStale: []endpointServicePair{}, }, { // Case[6]: remove IP and port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1429,10 +1381,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedStale: []endpointServicePair{{ - endpoint: "2.2.2.2:11", - servicePortName: makeServicePortName("ns1", "ep1", "p1"), - }}, }, { // Case[7]: rename port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1458,7 +1406,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedStale: []endpointServicePair{ /* can't detect this one */ }, }, { // Case[8]: renumber port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1484,19 +1431,13 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:22", false}, }, }, - expectedStale: []endpointServicePair{{ - endpoint: "1.1.1.1:11", - servicePortName: makeServicePortName("ns1", "ep1", "p1"), - }}, }} for tci, tc := range testCases { // outputs newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{} svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{} - staleConnections := map[endpointServicePair]bool{} - accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, - &newEndpoints, &svcPortToInfoMap, &staleConnections) + accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints, &svcPortToInfoMap) if len(newEndpoints) != len(tc.expectedNew) { t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints)) @@ -1512,14 +1453,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { } } } - if len(staleConnections) != len(tc.expectedStale) { - t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(staleConnections), staleConnections) - } - for _, x := range tc.expectedStale { - if staleConnections[x] != true { - t.Errorf("[%d] expected stale[%v], but didn't find it: %v", tci, x, staleConnections) - } - } } }