mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-09 13:12:20 +00:00
Merge pull request #48524 from freehan/udp-service-flush
Automatic merge from submit-queue (batch tested with PRs 48374, 48524, 48519, 42548, 48615) flush conntrack for udp service when # of backend changes from 0 fixes: #48370
This commit is contained in:
@@ -250,6 +250,17 @@ type serviceChangeMap struct {
|
||||
items map[types.NamespacedName]*serviceChange
|
||||
}
|
||||
|
||||
type updateEndpointMapResult struct {
|
||||
hcEndpoints map[types.NamespacedName]int
|
||||
staleEndpoints map[endpointServicePair]bool
|
||||
staleServiceNames map[proxy.ServicePortName]bool
|
||||
}
|
||||
|
||||
type updateServiceMapResult struct {
|
||||
hcServices map[types.NamespacedName]uint16
|
||||
staleServices sets.String
|
||||
}
|
||||
|
||||
type proxyServiceMap map[proxy.ServicePortName]*serviceInfo
|
||||
type proxyEndpointsMap map[proxy.ServicePortName][]*endpointsInfo
|
||||
|
||||
@@ -694,29 +705,29 @@ func shouldSkipService(svcName types.NamespacedName, service *api.Service) bool
|
||||
// <changes> map is cleared after applying them.
|
||||
func updateServiceMap(
|
||||
serviceMap proxyServiceMap,
|
||||
changes *serviceChangeMap) (hcServices map[types.NamespacedName]uint16, staleServices sets.String) {
|
||||
staleServices = sets.NewString()
|
||||
changes *serviceChangeMap) (result updateServiceMapResult) {
|
||||
result.staleServices = sets.NewString()
|
||||
|
||||
func() {
|
||||
changes.lock.Lock()
|
||||
defer changes.lock.Unlock()
|
||||
for _, change := range changes.items {
|
||||
existingPorts := serviceMap.merge(change.current)
|
||||
serviceMap.unmerge(change.previous, existingPorts, staleServices)
|
||||
serviceMap.unmerge(change.previous, existingPorts, result.staleServices)
|
||||
}
|
||||
changes.items = make(map[types.NamespacedName]*serviceChange)
|
||||
}()
|
||||
|
||||
// TODO: If this will appear to be computationally expensive, consider
|
||||
// computing this incrementally similarly to serviceMap.
|
||||
hcServices = make(map[types.NamespacedName]uint16)
|
||||
result.hcServices = make(map[types.NamespacedName]uint16)
|
||||
for svcPortName, info := range serviceMap {
|
||||
if info.healthCheckNodePort != 0 {
|
||||
hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
|
||||
result.hcServices[svcPortName.NamespacedName] = uint16(info.healthCheckNodePort)
|
||||
}
|
||||
}
|
||||
|
||||
return hcServices, staleServices
|
||||
return result
|
||||
}
|
||||
|
||||
func (proxier *Proxier) OnEndpointsAdd(endpoints *api.Endpoints) {
|
||||
@@ -755,8 +766,9 @@ func (proxier *Proxier) OnEndpointsSynced() {
|
||||
func updateEndpointsMap(
|
||||
endpointsMap proxyEndpointsMap,
|
||||
changes *endpointsChangeMap,
|
||||
hostname string) (hcEndpoints map[types.NamespacedName]int, staleSet map[endpointServicePair]bool) {
|
||||
staleSet = make(map[endpointServicePair]bool)
|
||||
hostname string) (result updateEndpointMapResult) {
|
||||
result.staleEndpoints = make(map[endpointServicePair]bool)
|
||||
result.staleServiceNames = make(map[proxy.ServicePortName]bool)
|
||||
|
||||
func() {
|
||||
changes.lock.Lock()
|
||||
@@ -764,7 +776,7 @@ func updateEndpointsMap(
|
||||
for _, change := range changes.items {
|
||||
endpointsMap.unmerge(change.previous)
|
||||
endpointsMap.merge(change.current)
|
||||
detectStaleConnections(change.previous, change.current, staleSet)
|
||||
detectStaleConnections(change.previous, change.current, result.staleEndpoints, result.staleServiceNames)
|
||||
}
|
||||
changes.items = make(map[types.NamespacedName]*endpointsChange)
|
||||
}()
|
||||
@@ -775,18 +787,17 @@ func updateEndpointsMap(
|
||||
|
||||
// TODO: If this will appear to be computationally expensive, consider
|
||||
// computing this incrementally similarly to endpointsMap.
|
||||
hcEndpoints = make(map[types.NamespacedName]int)
|
||||
result.hcEndpoints = make(map[types.NamespacedName]int)
|
||||
localIPs := getLocalIPs(endpointsMap)
|
||||
for nsn, ips := range localIPs {
|
||||
hcEndpoints[nsn] = len(ips)
|
||||
result.hcEndpoints[nsn] = len(ips)
|
||||
}
|
||||
|
||||
return hcEndpoints, staleSet
|
||||
return result
|
||||
}
|
||||
|
||||
// <staleEndpoints> are modified by this function with detected stale
|
||||
// connections.
|
||||
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool) {
|
||||
// <staleEndpoints> and <staleServices> are modified by this function with detected stale connections.
|
||||
func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap, staleEndpoints map[endpointServicePair]bool, staleServiceNames map[proxy.ServicePortName]bool) {
|
||||
for svcPortName, epList := range oldEndpointsMap {
|
||||
for _, ep := range epList {
|
||||
stale := true
|
||||
@@ -802,6 +813,13 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap proxyEndpointsMap,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for svcPortName, epList := range newEndpointsMap {
|
||||
// For udp service, if its backend changes from 0 to non-0. There may exist a conntrack entry that could blackhole traffic to the service.
|
||||
if len(epList) > 0 && len(oldEndpointsMap[svcPortName]) == 0 {
|
||||
staleServiceNames[svcPortName] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.String {
|
||||
@@ -983,11 +1001,20 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// We assume that if this was called, we really want to sync them,
|
||||
// even if nothing changed in the meantime. In other words, callers are
|
||||
// responsible for detecting no-op changes and not calling this function.
|
||||
hcServices, staleServices := updateServiceMap(
|
||||
serviceUpdateResult := updateServiceMap(
|
||||
proxier.serviceMap, &proxier.serviceChanges)
|
||||
hcEndpoints, staleEndpoints := updateEndpointsMap(
|
||||
endpointUpdateResult := updateEndpointsMap(
|
||||
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
|
||||
|
||||
staleServices := serviceUpdateResult.staleServices
|
||||
// merge stale services gathered from updateEndpointsMap
|
||||
for svcPortName := range endpointUpdateResult.staleServiceNames {
|
||||
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && svcInfo.protocol == api.ProtocolUDP {
|
||||
glog.V(2).Infof("Stale udp service %v -> %s", svcPortName, svcInfo.clusterIP.String())
|
||||
staleServices.Insert(svcInfo.clusterIP.String())
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(3).Infof("Syncing iptables rules")
|
||||
|
||||
// Create and link the kube services chain.
|
||||
@@ -1594,17 +1621,17 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Update healthchecks. The endpoints list might include services that are
|
||||
// not "OnlyLocal", but the services list will not, and the healthChecker
|
||||
// will just drop those endpoints.
|
||||
if err := proxier.healthChecker.SyncServices(hcServices); err != nil {
|
||||
if err := proxier.healthChecker.SyncServices(serviceUpdateResult.hcServices); err != nil {
|
||||
glog.Errorf("Error syncing healtcheck services: %v", err)
|
||||
}
|
||||
if err := proxier.healthChecker.SyncEndpoints(hcEndpoints); err != nil {
|
||||
if err := proxier.healthChecker.SyncEndpoints(endpointUpdateResult.hcEndpoints); err != nil {
|
||||
glog.Errorf("Error syncing healthcheck endoints: %v", err)
|
||||
}
|
||||
|
||||
// Finish housekeeping.
|
||||
// TODO: these and clearUDPConntrackForPort() could be made more consistent.
|
||||
utilproxy.DeleteServiceConnections(proxier.exec, staleServices.List())
|
||||
proxier.deleteEndpointConnections(staleEndpoints)
|
||||
proxier.deleteEndpointConnections(endpointUpdateResult.staleEndpoints)
|
||||
}
|
||||
|
||||
// Clear UDP conntrack for port or all conntrack entries when port equal zero.
|
||||
|
@@ -1088,24 +1088,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
for i := range services {
|
||||
fp.OnServiceAdd(services[i])
|
||||
}
|
||||
hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
result := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
if len(fp.serviceMap) != 8 {
|
||||
t.Errorf("expected service map length 8, got %v", fp.serviceMap)
|
||||
}
|
||||
|
||||
// The only-local-loadbalancer ones get added
|
||||
if len(hcPorts) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", hcPorts)
|
||||
if len(result.hcServices) != 1 {
|
||||
t.Errorf("expected 1 healthcheck port, got %v", result.hcServices)
|
||||
} else {
|
||||
nsn := makeNSN("somewhere", "only-local-load-balancer")
|
||||
if port, found := hcPorts[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, hcPorts)
|
||||
if port, found := result.hcServices[nsn]; !found || port != 345 {
|
||||
t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, result.hcServices)
|
||||
}
|
||||
}
|
||||
|
||||
if len(staleUDPServices) != 0 {
|
||||
if len(result.staleServices) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices))
|
||||
}
|
||||
|
||||
// Remove some stuff
|
||||
@@ -1121,24 +1121,24 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
||||
fp.OnServiceDelete(services[2])
|
||||
fp.OnServiceDelete(services[3])
|
||||
|
||||
hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
result = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
if len(fp.serviceMap) != 1 {
|
||||
t.Errorf("expected service map length 1, got %v", fp.serviceMap)
|
||||
}
|
||||
|
||||
if len(hcPorts) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", hcPorts)
|
||||
if len(result.hcServices) != 0 {
|
||||
t.Errorf("expected 0 healthcheck ports, got %v", result.hcServices)
|
||||
}
|
||||
|
||||
// All services but one were deleted. While you'd expect only the ClusterIPs
|
||||
// from the three deleted services here, we still have the ClusterIP for
|
||||
// the not-deleted service, because one of it's ServicePorts was deleted.
|
||||
expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
|
||||
if len(staleUDPServices) != len(expectedStaleUDPServices) {
|
||||
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), staleUDPServices.List())
|
||||
if len(result.staleServices) != len(expectedStaleUDPServices) {
|
||||
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.staleServices.List())
|
||||
}
|
||||
for _, ip := range expectedStaleUDPServices {
|
||||
if !staleUDPServices.Has(ip) {
|
||||
if !result.staleServices.Has(ip) {
|
||||
t.Errorf("expected stale UDP service service %s", ip)
|
||||
}
|
||||
}
|
||||
@@ -1161,18 +1161,18 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
||||
)
|
||||
|
||||
// Headless service should be ignored
|
||||
hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
result := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
if len(fp.serviceMap) != 0 {
|
||||
t.Errorf("expected service map length 0, got %d", len(fp.serviceMap))
|
||||
}
|
||||
|
||||
// No proxied services, so no healthchecks
|
||||
if len(hcPorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(hcPorts))
|
||||
if len(result.hcServices) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %d", len(result.hcServices))
|
||||
}
|
||||
|
||||
if len(staleUDPServices) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
|
||||
if len(result.staleServices) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1189,16 +1189,16 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
||||
}),
|
||||
)
|
||||
|
||||
hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
result := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
if len(fp.serviceMap) != 0 {
|
||||
t.Errorf("expected service map length 0, got %v", fp.serviceMap)
|
||||
}
|
||||
// No proxied services, so no healthchecks
|
||||
if len(hcPorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", hcPorts)
|
||||
if len(result.hcServices) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices)
|
||||
}
|
||||
if len(staleUDPServices) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices)
|
||||
if len(result.staleServices) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.staleServices)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1231,57 +1231,57 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
||||
|
||||
fp.OnServiceAdd(servicev1)
|
||||
|
||||
hcPorts, staleUDPServices := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
result := updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
if len(fp.serviceMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
||||
}
|
||||
if len(hcPorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", hcPorts)
|
||||
if len(result.hcServices) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices)
|
||||
}
|
||||
if len(staleUDPServices) != 0 {
|
||||
if len(result.staleServices) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices))
|
||||
}
|
||||
|
||||
// Change service to load-balancer
|
||||
fp.OnServiceUpdate(servicev1, servicev2)
|
||||
hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
result = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
if len(fp.serviceMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
||||
}
|
||||
if len(hcPorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", hcPorts)
|
||||
if len(result.hcServices) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices)
|
||||
}
|
||||
if len(staleUDPServices) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List())
|
||||
if len(result.staleServices) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.List())
|
||||
}
|
||||
|
||||
// No change; make sure the service map stays the same and there are
|
||||
// no health-check changes
|
||||
fp.OnServiceUpdate(servicev2, servicev2)
|
||||
hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
result = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
if len(fp.serviceMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
||||
}
|
||||
if len(hcPorts) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", hcPorts)
|
||||
if len(result.hcServices) != 1 {
|
||||
t.Errorf("expected healthcheck ports length 1, got %v", result.hcServices)
|
||||
}
|
||||
if len(staleUDPServices) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List())
|
||||
if len(result.staleServices) != 0 {
|
||||
t.Errorf("expected stale UDP services length 0, got %v", result.staleServices.List())
|
||||
}
|
||||
|
||||
// And back to ClusterIP
|
||||
fp.OnServiceUpdate(servicev2, servicev1)
|
||||
hcPorts, staleUDPServices = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
result = updateServiceMap(fp.serviceMap, &fp.serviceChanges)
|
||||
if len(fp.serviceMap) != 2 {
|
||||
t.Errorf("expected service map length 2, got %v", fp.serviceMap)
|
||||
}
|
||||
if len(hcPorts) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", hcPorts)
|
||||
if len(result.hcServices) != 0 {
|
||||
t.Errorf("expected healthcheck ports length 0, got %v", result.hcServices)
|
||||
}
|
||||
if len(staleUDPServices) != 0 {
|
||||
if len(result.staleServices) != 0 {
|
||||
// Services only added, so nothing stale yet
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(staleUDPServices))
|
||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.staleServices))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1610,6 +1610,9 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap, expected map[proxy.Serv
|
||||
func Test_updateEndpointsMap(t *testing.T) {
|
||||
var nodeName = testHostname
|
||||
|
||||
emptyEndpoint := func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{}
|
||||
}
|
||||
unnamedPort := func(ept *api.Endpoints) {
|
||||
ept.Subsets = []api.EndpointSubset{{
|
||||
Addresses: []api.EndpointAddress{{
|
||||
@@ -1914,18 +1917,20 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
// previousEndpoints and currentEndpoints are used to call appropriate
|
||||
// handlers OnEndpoints* (based on whether corresponding values are nil
|
||||
// or non-nil) and must be of equal length.
|
||||
previousEndpoints []*api.Endpoints
|
||||
currentEndpoints []*api.Endpoints
|
||||
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
|
||||
expectedResult map[proxy.ServicePortName][]*endpointsInfo
|
||||
expectedStale []endpointServicePair
|
||||
expectedHealthchecks map[types.NamespacedName]int
|
||||
previousEndpoints []*api.Endpoints
|
||||
currentEndpoints []*api.Endpoints
|
||||
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
|
||||
expectedResult map[proxy.ServicePortName][]*endpointsInfo
|
||||
expectedStaleEndpoints []endpointServicePair
|
||||
expectedStaleServiceNames map[proxy.ServicePortName]bool
|
||||
expectedHealthchecks map[types.NamespacedName]int
|
||||
}{{
|
||||
// Case[0]: nothing
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedStale: []endpointServicePair{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedStaleEndpoints: []endpointServicePair{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[1]: no change, unnamed port
|
||||
previousEndpoints: []*api.Endpoints{
|
||||
@@ -1944,8 +1949,9 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||
},
|
||||
},
|
||||
expectedStale: []endpointServicePair{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedStaleEndpoints: []endpointServicePair{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[2]: no change, named port, local
|
||||
previousEndpoints: []*api.Endpoints{
|
||||
@@ -1964,7 +1970,8 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
{endpoint: "1.1.1.1:11", isLocal: true},
|
||||
},
|
||||
},
|
||||
expectedStale: []endpointServicePair{},
|
||||
expectedStaleEndpoints: []endpointServicePair{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
@@ -1992,8 +1999,9 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
{endpoint: "1.1.1.2:12", isLocal: false},
|
||||
},
|
||||
},
|
||||
expectedStale: []endpointServicePair{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedStaleEndpoints: []endpointServicePair{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[4]: no change, multiple subsets, multiple ports, local
|
||||
previousEndpoints: []*api.Endpoints{
|
||||
@@ -2024,7 +2032,8 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
{endpoint: "1.1.1.3:13", isLocal: false},
|
||||
},
|
||||
},
|
||||
expectedStale: []endpointServicePair{},
|
||||
expectedStaleEndpoints: []endpointServicePair{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
@@ -2090,7 +2099,8 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
{endpoint: "2.2.2.2:22", isLocal: true},
|
||||
},
|
||||
},
|
||||
expectedStale: []endpointServicePair{},
|
||||
expectedStaleEndpoints: []endpointServicePair{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 2,
|
||||
makeNSN("ns2", "ep2"): 1,
|
||||
@@ -2109,7 +2119,10 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
{endpoint: "1.1.1.1:11", isLocal: true},
|
||||
},
|
||||
},
|
||||
expectedStale: []endpointServicePair{},
|
||||
expectedStaleEndpoints: []endpointServicePair{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", ""): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
@@ -2127,11 +2140,12 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
},
|
||||
},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedStale: []endpointServicePair{{
|
||||
expectedStaleEndpoints: []endpointServicePair{{
|
||||
endpoint: "1.1.1.1:11",
|
||||
servicePortName: makeServicePortName("ns1", "ep1", ""),
|
||||
}},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[8]: add an IP and port
|
||||
previousEndpoints: []*api.Endpoints{
|
||||
@@ -2155,7 +2169,10 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
{endpoint: "1.1.1.2:12", isLocal: true},
|
||||
},
|
||||
},
|
||||
expectedStale: []endpointServicePair{},
|
||||
expectedStaleEndpoints: []endpointServicePair{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12"): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
@@ -2182,7 +2199,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||
},
|
||||
},
|
||||
expectedStale: []endpointServicePair{{
|
||||
expectedStaleEndpoints: []endpointServicePair{{
|
||||
endpoint: "1.1.1.2:11",
|
||||
servicePortName: makeServicePortName("ns1", "ep1", "p11"),
|
||||
}, {
|
||||
@@ -2192,7 +2209,8 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
endpoint: "1.1.1.2:12",
|
||||
servicePortName: makeServicePortName("ns1", "ep1", "p12"),
|
||||
}},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[10]: add a subset
|
||||
previousEndpoints: []*api.Endpoints{
|
||||
@@ -2214,7 +2232,10 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
{endpoint: "1.1.1.2:12", isLocal: true},
|
||||
},
|
||||
},
|
||||
expectedStale: []endpointServicePair{},
|
||||
expectedStaleEndpoints: []endpointServicePair{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12"): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
makeNSN("ns1", "ep1"): 1,
|
||||
},
|
||||
@@ -2239,11 +2260,12 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||
},
|
||||
},
|
||||
expectedStale: []endpointServicePair{{
|
||||
expectedStaleEndpoints: []endpointServicePair{{
|
||||
endpoint: "1.1.1.2:12",
|
||||
servicePortName: makeServicePortName("ns1", "ep1", "p12"),
|
||||
}},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[12]: rename a port
|
||||
previousEndpoints: []*api.Endpoints{
|
||||
@@ -2262,10 +2284,13 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||
},
|
||||
},
|
||||
expectedStale: []endpointServicePair{{
|
||||
expectedStaleEndpoints: []endpointServicePair{{
|
||||
endpoint: "1.1.1.1:11",
|
||||
servicePortName: makeServicePortName("ns1", "ep1", "p11"),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p11-2"): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[13]: renumber a port
|
||||
@@ -2285,11 +2310,12 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
{endpoint: "1.1.1.1:22", isLocal: false},
|
||||
},
|
||||
},
|
||||
expectedStale: []endpointServicePair{{
|
||||
expectedStaleEndpoints: []endpointServicePair{{
|
||||
endpoint: "1.1.1.1:11",
|
||||
servicePortName: makeServicePortName("ns1", "ep1", "p11"),
|
||||
}},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
}, {
|
||||
// Case[14]: complex add and remove
|
||||
previousEndpoints: []*api.Endpoints{
|
||||
@@ -2341,7 +2367,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
{endpoint: "4.4.4.4:44", isLocal: true},
|
||||
},
|
||||
},
|
||||
expectedStale: []endpointServicePair{{
|
||||
expectedStaleEndpoints: []endpointServicePair{{
|
||||
endpoint: "2.2.2.2:22",
|
||||
servicePortName: makeServicePortName("ns2", "ep2", "p22"),
|
||||
}, {
|
||||
@@ -2357,10 +2383,35 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
endpoint: "4.4.4.6:45",
|
||||
servicePortName: makeServicePortName("ns4", "ep4", "p45"),
|
||||
}},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", "p12"): true,
|
||||
makeServicePortName("ns1", "ep1", "p122"): true,
|
||||
makeServicePortName("ns3", "ep3", "p33"): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{
|
||||
makeNSN("ns4", "ep4"): 1,
|
||||
},
|
||||
}}
|
||||
}, {
|
||||
// Case[15]: change from 0 endpoint address to 1 unnamed port
|
||||
previousEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", emptyEndpoint),
|
||||
},
|
||||
currentEndpoints: []*api.Endpoints{
|
||||
makeTestEndpoints("ns1", "ep1", unnamedPort),
|
||||
},
|
||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
|
||||
makeServicePortName("ns1", "ep1", ""): {
|
||||
{endpoint: "1.1.1.1:11", isLocal: false},
|
||||
},
|
||||
},
|
||||
expectedStaleEndpoints: []endpointServicePair{},
|
||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
||||
makeServicePortName("ns1", "ep1", ""): true,
|
||||
},
|
||||
expectedHealthchecks: map[types.NamespacedName]int{},
|
||||
},
|
||||
}
|
||||
|
||||
for tci, tc := range testCases {
|
||||
ipt := iptablestest.NewFake()
|
||||
@@ -2394,19 +2445,27 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
fp.OnEndpointsUpdate(prev, curr)
|
||||
}
|
||||
}
|
||||
hcEndpoints, stale := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname)
|
||||
result := updateEndpointsMap(fp.endpointsMap, &fp.endpointsChanges, fp.hostname)
|
||||
newMap := fp.endpointsMap
|
||||
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
||||
if len(stale) != len(tc.expectedStale) {
|
||||
t.Errorf("[%d] expected %d stale, got %d: %v", tci, len(tc.expectedStale), len(stale), stale)
|
||||
if len(result.staleEndpoints) != len(tc.expectedStaleEndpoints) {
|
||||
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.staleEndpoints), result.staleEndpoints)
|
||||
}
|
||||
for _, x := range tc.expectedStale {
|
||||
if stale[x] != true {
|
||||
t.Errorf("[%d] expected stale[%v], but didn't find it: %v", tci, x, stale)
|
||||
for _, x := range tc.expectedStaleEndpoints {
|
||||
if result.staleEndpoints[x] != true {
|
||||
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.staleEndpoints)
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(hcEndpoints, tc.expectedHealthchecks) {
|
||||
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, hcEndpoints)
|
||||
if len(result.staleServiceNames) != len(tc.expectedStaleServiceNames) {
|
||||
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.staleServiceNames), result.staleServiceNames)
|
||||
}
|
||||
for svcName := range tc.expectedStaleServiceNames {
|
||||
if result.staleServiceNames[svcName] != true {
|
||||
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.staleServiceNames)
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(result.hcEndpoints, tc.expectedHealthchecks) {
|
||||
t.Errorf("[%d] expected healthchecks %v, got %v", tci, tc.expectedHealthchecks, result.hcEndpoints)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user