mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-05 23:47:50 +00:00
Add tests for updateEndpoints
This commit is contained in:
@@ -588,42 +588,9 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
||||
defer proxier.mu.Unlock()
|
||||
proxier.haveReceivedEndpointsUpdate = true
|
||||
|
||||
staleConnections := make(map[endpointServicePair]bool)
|
||||
svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo)
|
||||
newEndpointsMap := make(map[proxy.ServicePortName][]*endpointsInfo)
|
||||
|
||||
// Update endpoints for services.
|
||||
for i := range allEndpoints {
|
||||
accumulateEndpointsMap(&allEndpoints[i], proxier.hostname, proxier.endpointsMap, &newEndpointsMap,
|
||||
&svcPortToInfoMap, &staleConnections)
|
||||
}
|
||||
// Check stale connections against endpoints missing from the update.
|
||||
// TODO: we should really only mark a connection stale if the proto was UDP
|
||||
// and the (ip, port, proto) was removed from the endpoints.
|
||||
for svcPort := range proxier.endpointsMap {
|
||||
if _, found := newEndpointsMap[svcPort]; !found {
|
||||
glog.V(3).Infof("Removing endpoints for %q", svcPort)
|
||||
// record endpoints of unactive service to stale connections
|
||||
for _, ep := range proxier.endpointsMap[svcPort] {
|
||||
staleConnections[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update service health check
|
||||
allSvcPorts := make(map[proxy.ServicePortName]bool)
|
||||
for svcPort := range proxier.endpointsMap {
|
||||
allSvcPorts[svcPort] = true
|
||||
}
|
||||
for svcPort := range newEndpointsMap {
|
||||
allSvcPorts[svcPort] = true
|
||||
}
|
||||
for svcPort := range allSvcPorts {
|
||||
proxier.updateHealthCheckEntries(svcPort.NamespacedName, svcPortToInfoMap[svcPort])
|
||||
}
|
||||
|
||||
if len(newEndpointsMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newEndpointsMap, proxier.endpointsMap) {
|
||||
proxier.endpointsMap = newEndpointsMap
|
||||
newMap, staleConnections := updateEndpoints(allEndpoints, proxier.endpointsMap, proxier.hostname, updateHealthCheckEntries)
|
||||
if len(newMap) != len(proxier.endpointsMap) || !reflect.DeepEqual(newMap, proxier.endpointsMap) {
|
||||
proxier.endpointsMap = newMap
|
||||
proxier.syncProxyRules()
|
||||
} else {
|
||||
glog.V(4).Infof("Skipping proxy iptables rule sync on endpoint update because nothing changed")
|
||||
@@ -632,9 +599,59 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) {
|
||||
proxier.deleteEndpointConnections(staleConnections)
|
||||
}
|
||||
|
||||
// Convert a slice of api.Endpoints objects into a map of service-port -> endpoints.
|
||||
// TODO: the hcUpdater should be a method on an interface, but it is a global pkg for now
|
||||
func updateEndpoints(allEndpoints []api.Endpoints, curMap map[proxy.ServicePortName][]*endpointsInfo, hostname string,
|
||||
hcUpdater func(types.NamespacedName, []hostPortInfo)) (newMap map[proxy.ServicePortName][]*endpointsInfo, stale map[endpointServicePair]bool) {
|
||||
|
||||
// return values
|
||||
newMap = make(map[proxy.ServicePortName][]*endpointsInfo)
|
||||
stale = make(map[endpointServicePair]bool)
|
||||
|
||||
// local
|
||||
svcPortToInfoMap := make(map[proxy.ServicePortName][]hostPortInfo)
|
||||
|
||||
// Update endpoints for services.
|
||||
for i := range allEndpoints {
|
||||
accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap, &svcPortToInfoMap, &stale)
|
||||
}
|
||||
// Check stale connections against endpoints missing from the update.
|
||||
// TODO: we should really only mark a connection stale if the proto was UDP
|
||||
// and the (ip, port, proto) was removed from the endpoints.
|
||||
for svcPort := range curMap {
|
||||
if _, found := newMap[svcPort]; !found {
|
||||
glog.V(3).Infof("Removing endpoints for %q", svcPort)
|
||||
// record endpoints of unactive service to stale connections
|
||||
for _, ep := range curMap[svcPort] {
|
||||
stale[endpointServicePair{endpoint: ep.endpoint, servicePortName: svcPort}] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update service health check
|
||||
allSvcPorts := make(map[proxy.ServicePortName]bool)
|
||||
for svcPort := range curMap {
|
||||
allSvcPorts[svcPort] = true
|
||||
}
|
||||
for svcPort := range newMap {
|
||||
allSvcPorts[svcPort] = true
|
||||
}
|
||||
for svcPort := range allSvcPorts {
|
||||
hcUpdater(svcPort.NamespacedName, svcPortToInfoMap[svcPort])
|
||||
}
|
||||
|
||||
return newMap, stale
|
||||
}
|
||||
|
||||
// Gather information about all the endpoint state for a given api.Endpoints.
|
||||
// This can not detect all stale connections, so the caller should also check
|
||||
// for entries that were totally removed.
|
||||
// This can not report complete info on stale connections because it has limited
|
||||
// scope - it only knows one Endpoints, but sees the whole current map. That
|
||||
// cleanup has to be done above.
|
||||
//
|
||||
// TODO: this could be simplified:
|
||||
// - hostPortInfo and endpointsInfo overlap too much
|
||||
// - the test for this is overlapped by the test for updateEndpoints
|
||||
// - naming is poor and responsibilities are muddled
|
||||
func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
|
||||
curEndpoints map[proxy.ServicePortName][]*endpointsInfo,
|
||||
newEndpoints *map[proxy.ServicePortName][]*endpointsInfo,
|
||||
@@ -682,7 +699,7 @@ func accumulateEndpointsMap(endpoints *api.Endpoints, hostname string,
|
||||
}
|
||||
|
||||
// updateHealthCheckEntries - send the new set of local endpoints to the health checker
|
||||
func (proxier *Proxier) updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) {
|
||||
func updateHealthCheckEntries(name types.NamespacedName, hostPorts []hostPortInfo) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ExternalTrafficLocalOnly) {
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user