Refactor OnEndpointsUpdate for testing

This is a weird function, but I didn't want to change any semantics
until the tests are in place.  Testing exposed one bug where stale
connections of renamed ports were not marked stale.

There are other things that seem wrong here, more will follow.
This commit is contained in:
Tim Hockin
2017-02-01 12:47:29 -08:00
parent d578105a44
commit 9507af3c79
3 changed files with 365 additions and 39 deletions

View File

@@ -595,47 +595,12 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
// Update endpoints for services.
for i := range allEndpoints {
svcEndpoints := &allEndpoints[i]
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
portsToEndpoints := map[string][]hostPortInfo{}
for i := range svcEndpoints.Subsets {
ss := &svcEndpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
for i := range ss.Addresses {
addr := &ss.Addresses[i]
hostPortObject := hostPortInfo{
host: addr.IP,
port: int(port.Port),
isLocal: addr.NodeName != nil && *addr.NodeName == proxier.hostname,
}
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject)
}
}
}
for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname}
svcPortToInfoMap[svcPort] = portsToEndpoints[portname]
curEndpoints := proxier.endpointsMap[svcPort]
newEndpoints := flattenValidEndpoints(portsToEndpoints[portname])
// Flatten the list of current endpoint infos to just a list of ips as strings
curEndpointIPs := flattenEndpointsInfo(curEndpoints)
if len(curEndpointIPs) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEndpoints) {
glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints)
// Gather stale connections to removed endpoints
removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEndpoints)
for _, ep := range removedEndpoints {
staleConnections[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true
}
}
// Once the set operations using the list of ips are complete, build the list of endpoint infos
newEndpointsMap[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEndpoints)
activeEndpoints[svcPort] = true
}
accumulateEndpointsMap(&allEndpoints[i], proxier.hostname, proxier.endpointsMap, &newEndpointsMap,
&svcPortToInfoMap, &staleConnections, &activeEndpoints)
}
// Check stale connections against endpoints missing from the update.
// TODO: we should really only mark a connection stale if the proto was UDP
// and the (ip, port, proto) was removed from the endpoints.
for svcPort := range proxier.endpointsMap {
if !activeEndpoints[svcPort] {
glog.V(2).Infof("Removing endpoints for %q", svcPort)
@@ -668,6 +633,57 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
proxier.deleteEndpointConnections(staleConnections)
}
// Gather information about all the endpoint state for a given api.Endpoints.
// This can not detect all stale connections, so the caller should also check
// for entries that were totally removed.
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
curEndpoints map[proxy.ServicePortName][]*endpointsInfo,
newEndpoints *map[proxy.ServicePortName][]*endpointsInfo,
svcPortToInfoMap *map[proxy.ServicePortName][]hostPortInfo,
staleConnections *map[endpointServicePair]bool,
activeEndpoints *map[proxy.ServicePortName]bool) {
// We need to build a map of portname -> all ip:ports for that
// portname. Explode Endpoints.Subsets[*] into this structure.
portsToEndpoints := map[string][]hostPortInfo{}
for i := range endpoints.Subsets {
ss := &endpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
for i := range ss.Addresses {
addr := &ss.Addresses[i]
hostPortObject := hostPortInfo{
host: addr.IP,
port: int(port.Port),
isLocal: addr.NodeName != nil && *addr.NodeName == hostname,
}
portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortObject)
}
}
}
for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{
NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name},
Port: portname,
}
(*svcPortToInfoMap)[svcPort] = portsToEndpoints[portname]
newEPList := flattenValidEndpoints(portsToEndpoints[portname])
// Flatten the list of current endpoint infos to just a list of ips as strings
curEndpointIPs := flattenEndpointsInfo(curEndpoints[svcPort])
if len(curEndpointIPs) != len(newEPList) || !slicesEquiv(slice.CopyStrings(curEndpointIPs), newEPList) {
glog.V(3).Infof("Setting endpoints for %q to %+v", svcPort, newEndpoints)
// Gather stale connections to removed endpoints
removedEndpoints := getRemovedEndpoints(curEndpointIPs, newEPList)
for _, ep := range removedEndpoints {
(*staleConnections)[endpointServicePair{endpoint: ep, servicePortName: svcPort}] = true
}
}
// Once the set operations using the list of ips are complete, build the list of endpoint infos
(*newEndpoints)[svcPort] = buildEndpointInfoList(portsToEndpoints[portname], newEPList)
(*activeEndpoints)[svcPort] = true
}
}
// updateHealthCheckEntries - send the new set of local endpoints to the health checker
func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) {
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {