Sanitize newEndpoints semantics, remove a dup arg

This commit is contained in:
Tim Hockin 2017-02-02 17:24:38 -08:00
parent 9507af3c79
commit 48ea304711
2 changed files with 20 additions and 50 deletions

View File

@ -588,7 +588,6 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
defer proxier.mu.Unlock() defer proxier.mu.Unlock()
proxier.haveReceivedEndpointsUpdate = true proxier.haveReceivedEndpointsUpdate = true
activeEndpoints := make(map[proxy.ServicePortName]bool) // use a map as a set
staleConnections := make(map[endpointServicePair]bool) staleConnections := make(map[endpointServicePair]bool)
svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo) svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo)
newEndpointsMap := make(map[proxy.ServicePortName][]*endpointsInfo) newEndpointsMap := make(map[proxy.ServicePortName][]*endpointsInfo)
@ -596,14 +595,14 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
// Update endpoints for services. // Update endpoints for services.
for i := range allEndpoints { for i := range allEndpoints {
accumulateEndpointsMap(&allEndpoints[i], proxier.hostname, proxier.endpointsMap, &newEndpointsMap, accumulateEndpointsMap(&allEndpoints[i], proxier.hostname, proxier.endpointsMap, &newEndpointsMap,
&svcPortToInfoMap, &staleConnections, &activeEndpoints) &svcPortToInfoMap, &staleConnections)
} }
// Check stale connections against endpoints missing from the update. // Check stale connections against endpoints missing from the update.
// TODO: we should really only mark a connection stale if the proto was UDP // TODO: we should really only mark a connection stale if the proto was UDP
// and the (ip, port, proto) was removed from the endpoints. // and the (ip, port, proto) was removed from the endpoints.
for svcPort := range proxier.endpointsMap { for svcPort := range proxier.endpointsMap {
if !activeEndpoints[svcPort] { if _, found := newEndpointsMap[svcPort]; !found {
glog.V(2).Infof("Removing endpoints for %q", svcPort) glog.V(3).Infof("Removing endpoints for %q", svcPort)
// record endpoints of unactive service to stale connections // record endpoints of unactive service to stale connections
for _, ep := range proxier.endpointsMap[svcPort] { for _, ep := range proxier.endpointsMap[svcPort] {
staleConnections[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true staleConnections[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true
@ -640,8 +639,7 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
curEndpoints map[proxy.ServicePortName][]*endpointsInfo, curEndpoints map[proxy.ServicePortName][]*endpointsInfo,
newEndpoints *map[proxy.ServicePortName][]*endpointsInfo, newEndpoints *map[proxy.ServicePortName][]*endpointsInfo,
svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo, svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo,
staleConnections *map[endpointServicePair]bool, staleConnections *map[endpointServicePair]bool) {
activeEndpoints *map[proxy.ServicePortName]bool) {
// We need to build a map of portname -> all ip:ports for that // We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure. // portname. Explode Endpoints.Subsets[*] into this structure.
@ -678,9 +676,9 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
(*staleConnections)[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true (*staleConnections)[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true
} }
} }
glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEPList)
// Once the set operations using the list of ips are complete, build the list of endpoint infos // Once the set operations using the list of ips are complete, build the list of endpoint infos
(*newEndpoints)[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEPList) (*newEndpoints)[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEPList)
(*activeEndpoints)[svcPort] = true
} }
} }

View File

@ -1177,18 +1177,16 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// This is a coarse test, but it offers some modicum of confidence as the code is evolved. // This is a coarse test, but it offers some modicum of confidence as the code is evolved.
func Test_accumulateEndpointsMap(t *testing.T) { func Test_accumulateEndpointsMap(t *testing.T) {
testCases := []struct { testCases := []struct {
newEndpoints api.Endpoints newEndpoints api.Endpoints
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
expectedNew map[proxy.ServicePortName][]*endpointsInfo expectedNew map[proxy.ServicePortName][]*endpointsInfo
expectedActive []proxy.ServicePortName expectedStale []endpointServicePair
expectedStale []endpointServicePair
}{{ }{{
// Case[0]: nothing // Case[0]: nothing
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}),
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, expectedNew: map[proxy.ServicePortName][]*endpointsInfo{},
expectedActive: []proxy.ServicePortName{}, expectedStale: []endpointServicePair{},
expectedStale: []endpointServicePair{},
}, { }, {
// Case[1]: no changes, unnamed port // Case[1]: no changes, unnamed port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
@ -1214,8 +1212,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
{"1.1.1.1:11", false}, {"1.1.1.1:11", false},
}, },
}, },
expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "")}, expectedStale: []endpointServicePair{},
expectedStale: []endpointServicePair{},
}, { }, {
// Case[2]: no changes, named port // Case[2]: no changes, named port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
@ -1241,8 +1238,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
{"1.1.1.1:11", false}, {"1.1.1.1:11", false},
}, },
}, },
expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "port")}, expectedStale: []endpointServicePair{},
expectedStale: []endpointServicePair{},
}, { }, {
// Case[3]: new port // Case[3]: new port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
@ -1265,8 +1261,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
{"1.1.1.1:11", false}, {"1.1.1.1:11", false},
}, },
}, },
expectedActive: []proxy.ServicePortName{makeServicePortName("ns1", "ep1", "")}, expectedStale: []endpointServicePair{},
expectedStale: []endpointServicePair{},
}, { }, {
// Case[4]: remove port // Case[4]: remove port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}), newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {}),
@ -1275,9 +1270,8 @@ func Test_accumulateEndpointsMap(t *testing.T) {
{"1.1.1.1:11", false}, {"1.1.1.1:11", false},
}, },
}, },
expectedNew: map[proxy.ServicePortName][]*endpointsInfo{}, expectedNew: map[proxy.ServicePortName][]*endpointsInfo{},
expectedActive: []proxy.ServicePortName{}, expectedStale: []endpointServicePair{ /* can't detect this one */ },
expectedStale: []endpointServicePair{ /* can't detect this one */ },
}, { }, {
// Case[5]: new IP and port // Case[5]: new IP and port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
@ -1313,10 +1307,6 @@ func Test_accumulateEndpointsMap(t *testing.T) {
{"2.2.2.2:22", false}, {"2.2.2.2:22", false},
}, },
}, },
expectedActive: []proxy.ServicePortName{
makeServicePortName("ns1", "ep1", "p1"),
makeServicePortName("ns1", "ep1", "p2"),
},
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
}, { }, {
// Case[6]: remove IP and port // Case[6]: remove IP and port
@ -1348,9 +1338,6 @@ func Test_accumulateEndpointsMap(t *testing.T) {
{"1.1.1.1:11", false}, {"1.1.1.1:11", false},
}, },
}, },
expectedActive: []proxy.ServicePortName{
makeServicePortName("ns1", "ep1", "p1"),
},
expectedStale: []endpointServicePair{{ expectedStale: []endpointServicePair{{
endpoint: "2.2.2.2:11", endpoint: "2.2.2.2:11",
servicePortName: makeServicePortName("ns1", "ep1", "p1"), servicePortName: makeServicePortName("ns1", "ep1", "p1"),
@ -1380,10 +1367,7 @@ func Test_accumulateEndpointsMap(t *testing.T) {
{"1.1.1.1:11", false}, {"1.1.1.1:11", false},
}, },
}, },
expectedActive: []proxy.ServicePortName{ expectedStale: []endpointServicePair{ /* can't detect this one */ },
makeServicePortName("ns1", "ep1", "p2"),
},
expectedStale: []endpointServicePair{},
}, { }, {
// Case[8]: renumber port // Case[8]: renumber port
newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { newEndpoints: makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) {
@ -1409,9 +1393,6 @@ func Test_accumulateEndpointsMap(t *testing.T) {
{"1.1.1.1:22", false}, {"1.1.1.1:22", false},
}, },
}, },
expectedActive: []proxy.ServicePortName{
makeServicePortName("ns1", "ep1", "p1"),
},
expectedStale: []endpointServicePair{{ expectedStale: []endpointServicePair{{
endpoint: "1.1.1.1:11", endpoint: "1.1.1.1:11",
servicePortName: makeServicePortName("ns1", "ep1", "p1"), servicePortName: makeServicePortName("ns1", "ep1", "p1"),
@ -1423,9 +1404,8 @@ func Test_accumulateEndpointsMap(t *testing.T) {
newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{} newEndpoints := map[proxy.ServicePortName][]*endpointsInfo{}
svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{} svcPortToInfoMap := map[proxy.ServicePortName][]hostPortInfo{}
staleConnections := map[endpointServicePair]bool{} staleConnections := map[endpointServicePair]bool{}
activeEndpoints := map[proxy.ServicePortName]bool{}
accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints,
&newEndpoints, &svcPortToInfoMap, &staleConnections, &activeEndpoints) &newEndpoints, &svcPortToInfoMap, &staleConnections)
if len(newEndpoints) != len(tc.expectedNew) { if len(newEndpoints) != len(tc.expectedNew) {
t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints)) t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints))
@ -1441,14 +1421,6 @@ func Test_accumulateEndpointsMap(t *testing.T) {
} }
} }
} }
if len(activeEndpoints) != len(tc.expectedActive) {
t.Errorf("[%d] expected %d active, got %d: %v", tci, len(tc.expectedActive), len(activeEndpoints), activeEndpoints)
}
for _, x := range tc.expectedActive {
if activeEndpoints[x] != true {
t.Errorf("[%d] expected active[%v], but didn't find it: %v", tci, x, activeEndpoints)
}
}
if len(staleConnections) != len(tc.expectedStale) { if len(staleConnections) != len(tc.expectedStale) {
t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(staleConnections), staleConnections) t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(staleConnections), staleConnections)
} }