From 48ea3047110abc0b532f3178e072ece8c7504903 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Thu, 2 Feb 2017 17:24:38 -0800 Subject: [PATCH] Sanitize newEndpoints semantics, remove a dup arg --- pkg/proxy/iptables/proxier.go | 12 +++---- pkg/proxy/iptables/proxier_test.go | 58 ++++++++---------------------- 2 files changed, 20 insertions(+), 50 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 5b27d6644f6..deba2cc6f83 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -588,7 +588,6 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { defer proxier.mu.Unlock() proxier.haveReceivedEndpointsUpdate = true - activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set staleConnections := make(map[endpointServicePair]bool) svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo) newEndpointsMap := make(map[proxy.ServicePortName][]*endpointsInfo) @@ -596,14 +595,14 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { // Update endpoints for services. for i := range allEndpoints { accumulateEndpointsMap(&allEndpoints[i], proxier.hostname, proxier.endpointsMap, &newEndpointsMap, - &svcPortToInfoMap, &staleConnections, &activeEndpoints) + &svcPortToInfoMap, &staleConnections) } // Check stale connections against endpoints missing from the update. // TODO: we should really only mark a connection stale if the proto was UDP // and the (ip, port, proto) was removed from the endpoints. for svcPort := range proxier.endpointsMap { - if !activeEndpoints[svcPort] { - glog.V(2).Infof("Removing endpoints for %q", svcPort) + if _, found := newEndpointsMap[svcPort]; !found { + glog.V(3).Infof("Removing endpoints for %q", svcPort) // record endpoints of unactive service to stale connections for _, ep := range proxier.endpointsMap[svcPort] { staleConnections[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true @@ -640,8 +639,7 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, curEndpoints map[proxy.ServicePortName][]*endpointsInfo, newEndpoints *map[proxy.ServicePortName][]*endpointsInfo, svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo, - staleConnections *map[endpointServicePair]bool, - activeEndpoints *map[proxy.ServicePortName]bool) { + staleConnections *map[endpointServicePair]bool) { // We need to build a map of portname -> all ip:ports for that // portname. Explode Endpoints.Subsets[*] into this structure. @@ -678,9 +676,9 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string, (*staleConnections)[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true } } + glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEPList) // Once the set operations using the list of ips are complete, build the list of endpoint infos (*newEndpoints)[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEPList) - (*activeEndpoints)[svcPort] = true } } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index e763b95d36f..194cae464fb 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -1177,18 +1177,16 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // This is a coarse test, but it offers some modicum of confidence as the code is evolved. func Test_accumulateEndpointsMap(t *testing.T) { testCases := []struct { - newEndpoints api.Endpoints - oldEndpoints map[proxy.ServicePortName][]*endpointsInfo - expectedNew map[proxy.ServicePortName][]*endpointsInfo - expectedActive []proxy.ServicePortName - expectedStale []endpointServicePair + newEndpoints api.Endpoints + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedNew map[proxy.ServicePortName][]*endpointsInfo + expectedStale []endpointServicePair }{{ // Case[0]: nothing - newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedActive: []proxy.ServicePortName{}, - expectedStale: []endpointServicePair{}, + newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedStale: []endpointServicePair{}, }, { // Case[1]: no changes, unnamed port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1214,8 +1212,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "")}, - expectedStale: []endpointServicePair{}, + expectedStale: []endpointServicePair{}, }, { // Case[2]: no changes, named port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1241,8 +1238,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "port")}, - expectedStale: []endpointServicePair{}, + expectedStale: []endpointServicePair{}, }, { // Case[3]: new port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1265,8 +1261,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "")}, - expectedStale: []endpointServicePair{}, + expectedStale: []endpointServicePair{}, }, { // Case[4]: remove port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), @@ -1275,9 +1270,8 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedActive: []proxy.ServicePortName{}, - expectedStale: []endpointServicePair{ /* can't detect this one */ }, + expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedStale: []endpointServicePair{ /* can't detect this one */ }, }, { // Case[5]: new IP and port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1313,10 +1307,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"2.2.2.2:22", false}, }, }, - expectedActive: []proxy.ServicePortName{ - makeServicePortName("ns1", "ep1", "p1"), - makeServicePortName("ns1", "ep1", "p2"), - }, expectedStale: []endpointServicePair{}, }, { // Case[6]: remove IP and port @@ -1348,9 +1338,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedActive: []proxy.ServicePortName{ - makeServicePortName("ns1", "ep1", "p1"), - }, expectedStale: []endpointServicePair{{ endpoint: "2.2.2.2:11", servicePortName: makeServicePortName("ns1", "ep1", "p1"), @@ -1380,10 +1367,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:11", false}, }, }, - expectedActive: []proxy.ServicePortName{ - makeServicePortName("ns1", "ep1", "p2"), - }, - expectedStale: []endpointServicePair{}, + expectedStale: []endpointServicePair{ /* can't detect this one */ }, }, { // Case[8]: renumber port newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { @@ -1409,9 +1393,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { {"1.1.1.1:22", false}, }, }, - expectedActive: []proxy.ServicePortName{ - makeServicePortName("ns1", "ep1", "p1"), - }, expectedStale: []endpointServicePair{{ endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", "p1"), @@ -1423,9 +1404,8 @@ func Test_accumulateEndpointsMap(t *testing.T) { newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{} svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{} staleConnections := map[endpointServicePair]bool{} - activeEndpoints := map[proxy.ServicePortName]bool{} accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, - &newEndpoints, &svcPortToInfoMap, &staleConnections, &activeEndpoints) + &newEndpoints, &svcPortToInfoMap, &staleConnections) if len(newEndpoints) != len(tc.expectedNew) { t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints)) @@ -1441,14 +1421,6 @@ func Test_accumulateEndpointsMap(t *testing.T) { } } } - if len(activeEndpoints) != len(tc.expectedActive) { - t.Errorf("[%d] expected %d active, got %d: %v", tci, len(tc.expectedActive), len(activeEndpoints), activeEndpoints) - } - for _, x := range tc.expectedActive { - if activeEndpoints[x] != true { - t.Errorf("[%d] expected active[%v], but didn't find it: %v", tci, x, activeEndpoints) - } - } if len(staleConnections) != len(tc.expectedStale) { t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(staleConnections), staleConnections) }