mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 02:41:25 +00:00
Improve the naming of the stale-conntrack-entry-tracking fields
The APIs talked about "stale services" and "stale endpoints", but the thing that is actually "stale" is the conntrack entries, not the services/endpoints. Fix the names to indicate what they actual keep track of. Also, all three fields (2 in the endpoints update object and 1 in the service update object) are currently UDP-specific, but only the service one made that clear. Fix that too.
This commit is contained in:
parent
4381973a44
commit
dea8e34ea7
@ -293,10 +293,17 @@ type endpointsChange struct {
|
|||||||
|
|
||||||
// UpdateEndpointMapResult is the updated results after applying endpoints changes.
|
// UpdateEndpointMapResult is the updated results after applying endpoints changes.
|
||||||
type UpdateEndpointMapResult struct {
|
type UpdateEndpointMapResult struct {
|
||||||
// StaleEndpoints identifies if an endpoints service pair is stale.
|
// DeletedUDPEndpoints identifies UDP endpoints that have just been deleted.
|
||||||
StaleEndpoints []ServiceEndpoint
|
// Existing conntrack NAT entries pointing to these endpoints must be deleted to
|
||||||
// StaleServiceNames identifies if a service is stale.
|
// ensure that no further traffic for the Service gets delivered to them.
|
||||||
StaleServiceNames []ServicePortName
|
DeletedUDPEndpoints []ServiceEndpoint
|
||||||
|
|
||||||
|
// NewlyActiveUDPServices identifies UDP Services that have just gone from 0 to
|
||||||
|
// non-0 endpoints. Existing conntrack entries caching the fact that these
|
||||||
|
// services are black holes must be deleted to ensure that traffic can immediately
|
||||||
|
// begin flowing to the new endpoints.
|
||||||
|
NewlyActiveUDPServices []ServicePortName
|
||||||
|
|
||||||
// List of the trigger times for all endpoints objects that changed. It's used to export the
|
// List of the trigger times for all endpoints objects that changed. It's used to export the
|
||||||
// network programming latency.
|
// network programming latency.
|
||||||
// NOTE(oxddr): this can be simplified to []time.Time if memory consumption becomes an issue.
|
// NOTE(oxddr): this can be simplified to []time.Time if memory consumption becomes an issue.
|
||||||
@ -305,26 +312,24 @@ type UpdateEndpointMapResult struct {
|
|||||||
|
|
||||||
// Update updates endpointsMap base on the given changes.
|
// Update updates endpointsMap base on the given changes.
|
||||||
func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
|
func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
|
||||||
result.StaleEndpoints = make([]ServiceEndpoint, 0)
|
result.DeletedUDPEndpoints = make([]ServiceEndpoint, 0)
|
||||||
result.StaleServiceNames = make([]ServicePortName, 0)
|
result.NewlyActiveUDPServices = make([]ServicePortName, 0)
|
||||||
result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
|
result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
|
||||||
|
|
||||||
em.apply(
|
em.apply(changes, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices, &result.LastChangeTriggerTimes)
|
||||||
changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes)
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// EndpointsMap maps a service name to a list of all its Endpoints.
|
// EndpointsMap maps a service name to a list of all its Endpoints.
|
||||||
type EndpointsMap map[ServicePortName][]Endpoint
|
type EndpointsMap map[ServicePortName][]Endpoint
|
||||||
|
|
||||||
// apply the changes to EndpointsMap and updates stale endpoints and service-endpoints pair. The `staleEndpoints` argument
|
// apply the changes to EndpointsMap, update the passed-in stale-conntrack-entry arrays,
|
||||||
// is passed in to store the stale udp endpoints and `staleServiceNames` argument is passed in to store the stale udp service.
|
// and clear the changes map. In addition it returns (via argument) and resets the
|
||||||
// The changes map is cleared after applying them.
|
// lastChangeTriggerTimes for all endpoints that were changed and will result in syncing
|
||||||
// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints
|
// the proxy rules. apply triggers processEndpointsMapChange on every change.
|
||||||
// that were changed and will result in syncing the proxy rules.
|
func (em EndpointsMap) apply(ect *EndpointChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint,
|
||||||
// apply triggers processEndpointsMapChange on every change.
|
newlyActiveUDPServices *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
|
||||||
func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
|
|
||||||
staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
|
|
||||||
if ect == nil {
|
if ect == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -336,7 +341,7 @@ func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]Servi
|
|||||||
}
|
}
|
||||||
em.unmerge(change.previous)
|
em.unmerge(change.previous)
|
||||||
em.merge(change.current)
|
em.merge(change.current)
|
||||||
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)
|
detectStaleConntrackEntries(change.previous, change.current, deletedUDPEndpoints, newlyActiveUDPServices)
|
||||||
}
|
}
|
||||||
ect.checkoutTriggerTimes(lastChangeTriggerTimes)
|
ect.checkoutTriggerTimes(lastChangeTriggerTimes)
|
||||||
}
|
}
|
||||||
@ -397,41 +402,45 @@ func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int {
|
|||||||
return eps
|
return eps
|
||||||
}
|
}
|
||||||
|
|
||||||
// detectStaleConnections modifies <staleEndpoints> and <staleServices> with detected stale connections. <staleServiceNames>
|
// detectStaleConntrackEntries detects services that may be associated with stale conntrack entries.
|
||||||
// is used to store stale udp service in order to clear udp conntrack later.
|
// (See UpdateEndpointMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.)
|
||||||
func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
|
func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName) {
|
||||||
// Detect stale endpoints: an endpoint can have stale conntrack entries if it was receiving traffic
|
// Find the UDP endpoints that we were sending traffic to in oldEndpointsMap, but
|
||||||
// and then goes unready or changes its IP address.
|
// are no longer sending to newEndpointsMap. The proxier should make sure that
|
||||||
|
// conntrack does not accidentally route any new connections to them.
|
||||||
for svcPortName, epList := range oldEndpointsMap {
|
for svcPortName, epList := range oldEndpointsMap {
|
||||||
if svcPortName.Protocol != v1.ProtocolUDP {
|
if svcPortName.Protocol != v1.ProtocolUDP {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ep := range epList {
|
for _, ep := range epList {
|
||||||
// if the old endpoint wasn't ready is not possible to have stale entries
|
// If the old endpoint wasn't Ready then there can't be stale
|
||||||
// since there was no traffic sent to it.
|
// conntrack entries since there was no traffic sent to it.
|
||||||
if !ep.IsReady() {
|
if !ep.IsReady() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
stale := true
|
|
||||||
// Check if the endpoint has changed, including if it went from ready to not ready.
|
deleted := true
|
||||||
// If it did change stale entries for the old endpoint has to be cleared.
|
// Check if the endpoint has changed, including if it went from
|
||||||
|
// ready to not ready. If it did change stale entries for the old
|
||||||
|
// endpoint have to be cleared.
|
||||||
for i := range newEndpointsMap[svcPortName] {
|
for i := range newEndpointsMap[svcPortName] {
|
||||||
if newEndpointsMap[svcPortName][i].Equal(ep) {
|
if newEndpointsMap[svcPortName][i].Equal(ep) {
|
||||||
stale = false
|
deleted = false
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if stale {
|
if deleted {
|
||||||
klog.V(4).InfoS("Stale endpoint", "portName", svcPortName, "endpoint", ep)
|
klog.V(4).InfoS("Deleted endpoint may have stale conntrack entries", "portName", svcPortName, "endpoint", ep)
|
||||||
*staleEndpoints = append(*staleEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName})
|
*deletedUDPEndpoints = append(*deletedUDPEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Detect stale services
|
// Detect services that have gone from 0 to non-0 ready endpoints. If there were
|
||||||
// For udp service, if its backend changes from 0 to non-0 ready endpoints.
|
// previously 0 endpoints, but someone tried to connect to it, then a conntrack
|
||||||
// There may exist a conntrack entry that could blackhole traffic to the service.
|
// entry may have been created blackholing traffic to that IP, which should be
|
||||||
|
// deleted now.
|
||||||
for svcPortName, epList := range newEndpointsMap {
|
for svcPortName, epList := range newEndpointsMap {
|
||||||
if svcPortName.Protocol != v1.ProtocolUDP {
|
if svcPortName.Protocol != v1.ProtocolUDP {
|
||||||
continue
|
continue
|
||||||
@ -452,7 +461,7 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, stale
|
|||||||
}
|
}
|
||||||
|
|
||||||
if epReady > 0 && oldEpReady == 0 {
|
if epReady > 0 && oldEpReady == 0 {
|
||||||
*staleServiceNames = append(*staleServiceNames, svcPortName)
|
*newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -498,23 +498,23 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
// previousEndpoints and currentEndpoints are used to call appropriate
|
// previousEndpoints and currentEndpoints are used to call appropriate
|
||||||
// handlers OnEndpointSlice* (based on whether corresponding values are nil
|
// handlers OnEndpointSlice* (based on whether corresponding values are nil
|
||||||
// or non-nil) and must be of equal length.
|
// or non-nil) and must be of equal length.
|
||||||
name string
|
name string
|
||||||
previousEndpoints []*discovery.EndpointSlice
|
previousEndpoints []*discovery.EndpointSlice
|
||||||
currentEndpoints []*discovery.EndpointSlice
|
currentEndpoints []*discovery.EndpointSlice
|
||||||
oldEndpoints map[ServicePortName][]*BaseEndpointInfo
|
oldEndpoints map[ServicePortName][]*BaseEndpointInfo
|
||||||
expectedResult map[ServicePortName][]*BaseEndpointInfo
|
expectedResult map[ServicePortName][]*BaseEndpointInfo
|
||||||
expectedStaleEndpoints []ServiceEndpoint
|
expectedDeletedUDPEndpoints []ServiceEndpoint
|
||||||
expectedStaleServiceNames map[ServicePortName]bool
|
expectedNewlyActiveUDPServices map[ServicePortName]bool
|
||||||
expectedLocalEndpoints map[types.NamespacedName]int
|
expectedLocalEndpoints map[types.NamespacedName]int
|
||||||
expectedChangedEndpoints sets.String
|
expectedChangedEndpoints sets.String
|
||||||
}{{
|
}{{
|
||||||
name: "empty",
|
name: "empty",
|
||||||
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{},
|
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{},
|
||||||
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
|
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
expectedChangedEndpoints: sets.NewString(),
|
expectedChangedEndpoints: sets.NewString(),
|
||||||
}, {
|
}, {
|
||||||
name: "no change, unnamed port",
|
name: "no change, unnamed port",
|
||||||
previousEndpoints: []*discovery.EndpointSlice{
|
previousEndpoints: []*discovery.EndpointSlice{
|
||||||
@ -533,10 +533,10 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
expectedChangedEndpoints: sets.NewString(),
|
expectedChangedEndpoints: sets.NewString(),
|
||||||
}, {
|
}, {
|
||||||
name: "no change, named port, local",
|
name: "no change, named port, local",
|
||||||
previousEndpoints: []*discovery.EndpointSlice{
|
previousEndpoints: []*discovery.EndpointSlice{
|
||||||
@ -555,8 +555,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 1,
|
makeNSN("ns1", "ep1"): 1,
|
||||||
},
|
},
|
||||||
@ -587,10 +587,10 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
expectedChangedEndpoints: sets.NewString(),
|
expectedChangedEndpoints: sets.NewString(),
|
||||||
}, {
|
}, {
|
||||||
name: "no change, multiple slices, multiple ports, local",
|
name: "no change, multiple slices, multiple ports, local",
|
||||||
previousEndpoints: []*discovery.EndpointSlice{
|
previousEndpoints: []*discovery.EndpointSlice{
|
||||||
@ -623,8 +623,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 1,
|
makeNSN("ns1", "ep1"): 1,
|
||||||
},
|
},
|
||||||
@ -693,8 +693,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "2.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "2.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 2,
|
makeNSN("ns1", "ep1"): 2,
|
||||||
makeNSN("ns2", "ep2"): 1,
|
makeNSN("ns2", "ep2"): 1,
|
||||||
@ -714,8 +714,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||||
@ -736,13 +736,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
|
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
|
||||||
Endpoint: "1.1.1.1:11",
|
Endpoint: "1.1.1.1:11",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||||
}, {
|
}, {
|
||||||
name: "add an IP and port",
|
name: "add an IP and port",
|
||||||
previousEndpoints: []*discovery.EndpointSlice{
|
previousEndpoints: []*discovery.EndpointSlice{
|
||||||
@ -766,8 +766,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||||
@ -797,7 +797,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
|
||||||
Endpoint: "1.1.1.2:11",
|
Endpoint: "1.1.1.2:11",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||||
}, {
|
}, {
|
||||||
@ -807,9 +807,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
Endpoint: "1.1.1.2:12",
|
Endpoint: "1.1.1.2:12",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||||
}, {
|
}, {
|
||||||
name: "add a slice to an endpoint",
|
name: "add a slice to an endpoint",
|
||||||
previousEndpoints: []*discovery.EndpointSlice{
|
previousEndpoints: []*discovery.EndpointSlice{
|
||||||
@ -833,8 +833,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||||
@ -864,13 +864,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
|
||||||
Endpoint: "1.1.1.2:12",
|
Endpoint: "1.1.1.2:12",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||||
}, {
|
}, {
|
||||||
name: "rename a port",
|
name: "rename a port",
|
||||||
previousEndpoints: []*discovery.EndpointSlice{
|
previousEndpoints: []*discovery.EndpointSlice{
|
||||||
@ -889,11 +889,11 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
|
||||||
Endpoint: "1.1.1.1:11",
|
Endpoint: "1.1.1.1:11",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
@ -916,13 +916,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
|
||||||
Endpoint: "1.1.1.1:11",
|
Endpoint: "1.1.1.1:11",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
expectedChangedEndpoints: sets.NewString("ns1/ep1"),
|
||||||
}, {
|
}, {
|
||||||
name: "complex add and remove",
|
name: "complex add and remove",
|
||||||
previousEndpoints: []*discovery.EndpointSlice{
|
previousEndpoints: []*discovery.EndpointSlice{
|
||||||
@ -988,7 +988,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "4.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "4.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
|
||||||
Endpoint: "2.2.2.2:22",
|
Endpoint: "2.2.2.2:22",
|
||||||
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
|
||||||
}, {
|
}, {
|
||||||
@ -1004,7 +1004,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
Endpoint: "4.4.4.6:45",
|
Endpoint: "4.4.4.6:45",
|
||||||
ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||||
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
|
||||||
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
|
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
|
||||||
@ -1027,8 +1027,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
@ -1079,33 +1079,33 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
||||||
newMap := fp.endpointsMap
|
newMap := fp.endpointsMap
|
||||||
compareEndpointsMapsStr(t, newMap, tc.expectedResult)
|
compareEndpointsMapsStr(t, newMap, tc.expectedResult)
|
||||||
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
|
if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) {
|
||||||
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints)
|
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints)
|
||||||
}
|
}
|
||||||
for _, x := range tc.expectedStaleEndpoints {
|
for _, x := range tc.expectedDeletedUDPEndpoints {
|
||||||
found := false
|
found := false
|
||||||
for _, stale := range result.StaleEndpoints {
|
for _, stale := range result.DeletedUDPEndpoints {
|
||||||
if stale == x {
|
if stale == x {
|
||||||
found = true
|
found = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints)
|
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) {
|
if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) {
|
||||||
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames)
|
t.Errorf("[%d] expected %d newlyActiveUDPServices, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices)
|
||||||
}
|
}
|
||||||
for svcName := range tc.expectedStaleServiceNames {
|
for svcName := range tc.expectedNewlyActiveUDPServices {
|
||||||
found := false
|
found := false
|
||||||
for _, stale := range result.StaleServiceNames {
|
for _, newSvcName := range result.NewlyActiveUDPServices {
|
||||||
if stale == svcName {
|
if newSvcName == svcName {
|
||||||
found = true
|
found = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
|
t.Errorf("[%d] expected newlyActiveUDPServices[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -748,8 +748,8 @@ func isServiceChainName(chainString string) bool {
|
|||||||
// risk sending more traffic to it, all of which will be lost (because UDP).
|
// risk sending more traffic to it, all of which will be lost (because UDP).
|
||||||
// This assumes the proxier mutex is held
|
// This assumes the proxier mutex is held
|
||||||
// TODO: move it to util
|
// TODO: move it to util
|
||||||
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
|
func (proxier *Proxier) deleteUDPEndpointConnections(deletedUDPEndpoints []proxy.ServiceEndpoint) {
|
||||||
for _, epSvcPair := range connectionMap {
|
for _, epSvcPair := range deletedUDPEndpoints {
|
||||||
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
|
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
|
||||||
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
|
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
|
||||||
nodePort := svcInfo.NodePort()
|
nodePort := svcInfo.NodePort()
|
||||||
@ -833,13 +833,13 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
|
|
||||||
// We need to detect stale connections to UDP Services so we
|
// We need to detect stale connections to UDP Services so we
|
||||||
// can clean dangling conntrack entries that can blackhole traffic.
|
// can clean dangling conntrack entries that can blackhole traffic.
|
||||||
conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP
|
conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
|
||||||
conntrackCleanupServiceNodePorts := sets.NewInt()
|
conntrackCleanupServiceNodePorts := sets.NewInt()
|
||||||
// merge stale services gathered from updateEndpointsMap
|
// merge stale services gathered from updateEndpointsMap
|
||||||
// an UDP service that changes from 0 to non-0 endpoints is considered stale.
|
// an UDP service that changes from 0 to non-0 endpoints is considered stale.
|
||||||
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
|
for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
|
||||||
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
|
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
|
||||||
klog.V(2).InfoS("Stale UDP service", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP())
|
klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
|
||||||
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
|
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
|
||||||
for _, extIP := range svcInfo.ExternalIPStrings() {
|
for _, extIP := range svcInfo.ExternalIPStrings() {
|
||||||
conntrackCleanupServiceIPs.Insert(extIP)
|
conntrackCleanupServiceIPs.Insert(extIP)
|
||||||
@ -849,7 +849,6 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
}
|
}
|
||||||
nodePort := svcInfo.NodePort()
|
nodePort := svcInfo.NodePort()
|
||||||
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
|
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
|
||||||
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "nodePort", nodePort)
|
|
||||||
conntrackCleanupServiceNodePorts.Insert(nodePort)
|
conntrackCleanupServiceNodePorts.Insert(nodePort)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1637,8 +1636,8 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort)
|
klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints)
|
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.DeletedUDPEndpoints)
|
||||||
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
|
proxier.deleteUDPEndpointConnections(endpointUpdateResult.DeletedUDPEndpoints)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
|
func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
|
||||||
|
@ -182,7 +182,7 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
fp.deleteEndpointConnections(input)
|
fp.deleteUDPEndpointConnections(input)
|
||||||
|
|
||||||
// For UDP and SCTP connections, check the executed conntrack command
|
// For UDP and SCTP connections, check the executed conntrack command
|
||||||
var expExecs int
|
var expExecs int
|
||||||
@ -325,7 +325,7 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
fp.deleteEndpointConnections(input)
|
fp.deleteUDPEndpointConnections(input)
|
||||||
|
|
||||||
// For UDP connections, check the executed conntrack command
|
// For UDP connections, check the executed conntrack command
|
||||||
var expExecs int
|
var expExecs int
|
||||||
@ -4079,9 +4079,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
|||||||
t.Errorf("expected service map length 10, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 10, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
// Services only added, so nothing stale yet
|
// Services only added, so nothing stale yet
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
|
||||||
}
|
}
|
||||||
|
|
||||||
// The only-local-loadbalancer ones get added
|
// The only-local-loadbalancer ones get added
|
||||||
@ -4117,11 +4117,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
|||||||
// from the three deleted services here, we still have the ClusterIP for
|
// from the three deleted services here, we still have the ClusterIP for
|
||||||
// the not-deleted service, because one of it's ServicePorts was deleted.
|
// the not-deleted service, because one of it's ServicePorts was deleted.
|
||||||
expectedStaleUDPServices := []string{"172.30.55.10", "172.30.55.4", "172.30.55.11", "172.30.55.12"}
|
expectedStaleUDPServices := []string{"172.30.55.10", "172.30.55.4", "172.30.55.11", "172.30.55.12"}
|
||||||
if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) {
|
if len(result.DeletedUDPClusterIPs) != len(expectedStaleUDPServices) {
|
||||||
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList())
|
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.DeletedUDPClusterIPs.UnsortedList())
|
||||||
}
|
}
|
||||||
for _, ip := range expectedStaleUDPServices {
|
for _, ip := range expectedStaleUDPServices {
|
||||||
if !result.UDPStaleClusterIP.Has(ip) {
|
if !result.DeletedUDPClusterIPs.Has(ip) {
|
||||||
t.Errorf("expected stale UDP service service %s", ip)
|
t.Errorf("expected stale UDP service service %s", ip)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -4154,8 +4154,8 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
|||||||
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
|
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
|
||||||
}
|
}
|
||||||
|
|
||||||
// No proxied services, so no healthchecks
|
// No proxied services, so no healthchecks
|
||||||
@ -4182,8 +4182,8 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 0 {
|
if len(fp.svcPortMap) != 0 {
|
||||||
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
|
t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs)
|
||||||
}
|
}
|
||||||
// No proxied services, so no healthchecks
|
// No proxied services, so no healthchecks
|
||||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||||
@ -4223,9 +4223,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 2 {
|
if len(fp.svcPortMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
// Services only added, so nothing stale yet
|
// Services only added, so nothing stale yet
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
|
||||||
}
|
}
|
||||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||||
if len(healthCheckNodePorts) != 0 {
|
if len(healthCheckNodePorts) != 0 {
|
||||||
@ -4238,8 +4238,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 2 {
|
if len(fp.svcPortMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
|
||||||
}
|
}
|
||||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||||
if len(healthCheckNodePorts) != 1 {
|
if len(healthCheckNodePorts) != 1 {
|
||||||
@ -4253,8 +4253,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 2 {
|
if len(fp.svcPortMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
|
||||||
}
|
}
|
||||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||||
if len(healthCheckNodePorts) != 1 {
|
if len(healthCheckNodePorts) != 1 {
|
||||||
@ -4267,9 +4267,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 2 {
|
if len(fp.svcPortMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
// Services only added, so nothing stale yet
|
// Services only added, so nothing stale yet
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
|
||||||
}
|
}
|
||||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||||
if len(healthCheckNodePorts) != 0 {
|
if len(healthCheckNodePorts) != 0 {
|
||||||
@ -4674,22 +4674,22 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
// previousEndpoints and currentEndpoints are used to call appropriate
|
// previousEndpoints and currentEndpoints are used to call appropriate
|
||||||
// handlers OnEndpoints* (based on whether corresponding values are nil
|
// handlers OnEndpoints* (based on whether corresponding values are nil
|
||||||
// or non-nil) and must be of equal length.
|
// or non-nil) and must be of equal length.
|
||||||
name string
|
name string
|
||||||
previousEndpoints []*discovery.EndpointSlice
|
previousEndpoints []*discovery.EndpointSlice
|
||||||
currentEndpoints []*discovery.EndpointSlice
|
currentEndpoints []*discovery.EndpointSlice
|
||||||
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
|
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
|
||||||
expectedResult map[proxy.ServicePortName][]*endpointsInfo
|
expectedResult map[proxy.ServicePortName][]*endpointsInfo
|
||||||
expectedStaleEndpoints []proxy.ServiceEndpoint
|
expectedDeletedUDPEndpoints []proxy.ServiceEndpoint
|
||||||
expectedStaleServiceNames map[proxy.ServicePortName]bool
|
expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool
|
||||||
expectedLocalEndpoints map[types.NamespacedName]int
|
expectedLocalEndpoints map[types.NamespacedName]int
|
||||||
}{{
|
}{{
|
||||||
// Case[0]: nothing
|
// Case[0]: nothing
|
||||||
name: "nothing",
|
name: "nothing",
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[1]: no change, named port, local
|
// Case[1]: no change, named port, local
|
||||||
name: "no change, named port, local",
|
name: "no change, named port, local",
|
||||||
@ -4705,8 +4705,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 1,
|
makeNSN("ns1", "ep1"): 1,
|
||||||
},
|
},
|
||||||
@ -4731,9 +4731,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[3]: no change, multiple subsets, multiple ports, local
|
// Case[3]: no change, multiple subsets, multiple ports, local
|
||||||
name: "no change, multiple subsets, multiple ports, local",
|
name: "no change, multiple subsets, multiple ports, local",
|
||||||
@ -4761,8 +4761,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 1,
|
makeNSN("ns1", "ep1"): 1,
|
||||||
},
|
},
|
||||||
@ -4823,8 +4823,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 2,
|
makeNSN("ns1", "ep1"): 2,
|
||||||
makeNSN("ns2", "ep2"): 1,
|
makeNSN("ns2", "ep2"): 1,
|
||||||
@ -4840,8 +4840,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||||
@ -4858,12 +4858,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
||||||
Endpoint: "10.1.1.1:11",
|
Endpoint: "10.1.1.1:11",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[7]: add an IP and port
|
// Case[7]: add an IP and port
|
||||||
name: "add an IP and port",
|
name: "add an IP and port",
|
||||||
@ -4884,8 +4884,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||||
@ -4911,7 +4911,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
||||||
Endpoint: "10.1.1.2:11",
|
Endpoint: "10.1.1.2:11",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||||
}, {
|
}, {
|
||||||
@ -4921,8 +4921,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
Endpoint: "10.1.1.2:12",
|
Endpoint: "10.1.1.2:12",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[9]: add a subset
|
// Case[9]: add a subset
|
||||||
name: "add a subset",
|
name: "add a subset",
|
||||||
@ -4941,8 +4941,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{
|
expectedLocalEndpoints: map[types.NamespacedName]int{
|
||||||
@ -4966,12 +4966,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
||||||
Endpoint: "10.1.1.2:12",
|
Endpoint: "10.1.1.2:12",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[11]: rename a port
|
// Case[11]: rename a port
|
||||||
name: "rename a port",
|
name: "rename a port",
|
||||||
@ -4987,11 +4987,11 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
||||||
Endpoint: "10.1.1.1:11",
|
Endpoint: "10.1.1.1:11",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
@ -5010,12 +5010,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
||||||
Endpoint: "10.1.1.1:11",
|
Endpoint: "10.1.1.1:11",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[13]: complex add and remove
|
// Case[13]: complex add and remove
|
||||||
name: "complex add and remove",
|
name: "complex add and remove",
|
||||||
@ -5058,7 +5058,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
||||||
Endpoint: "10.2.2.2:22",
|
Endpoint: "10.2.2.2:22",
|
||||||
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
|
||||||
}, {
|
}, {
|
||||||
@ -5074,7 +5074,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
Endpoint: "10.4.4.6:45",
|
Endpoint: "10.4.4.6:45",
|
||||||
ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||||
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
|
||||||
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
|
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
|
||||||
@ -5093,8 +5093,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
expectedLocalEndpoints: map[types.NamespacedName]int{},
|
||||||
@ -5136,33 +5136,33 @@ func TestUpdateEndpointsMap(t *testing.T) {
|
|||||||
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
||||||
newMap := fp.endpointsMap
|
newMap := fp.endpointsMap
|
||||||
compareEndpointsMapsExceptChainName(t, tci, newMap, tc.expectedResult)
|
compareEndpointsMapsExceptChainName(t, tci, newMap, tc.expectedResult)
|
||||||
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
|
if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) {
|
||||||
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints)
|
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints)
|
||||||
}
|
}
|
||||||
for _, x := range tc.expectedStaleEndpoints {
|
for _, x := range tc.expectedDeletedUDPEndpoints {
|
||||||
found := false
|
found := false
|
||||||
for _, stale := range result.StaleEndpoints {
|
for _, stale := range result.DeletedUDPEndpoints {
|
||||||
if stale == x {
|
if stale == x {
|
||||||
found = true
|
found = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints)
|
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) {
|
if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) {
|
||||||
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames)
|
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices)
|
||||||
}
|
}
|
||||||
for svcName := range tc.expectedStaleServiceNames {
|
for svcName := range tc.expectedNewlyActiveUDPServices {
|
||||||
found := false
|
found := false
|
||||||
for _, stale := range result.StaleServiceNames {
|
for _, stale := range result.NewlyActiveUDPServices {
|
||||||
if stale == svcName {
|
if stale == svcName {
|
||||||
found = true
|
found = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
|
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
||||||
|
@ -948,13 +948,13 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
|
|
||||||
// We need to detect stale connections to UDP Services so we
|
// We need to detect stale connections to UDP Services so we
|
||||||
// can clean dangling conntrack entries that can blackhole traffic.
|
// can clean dangling conntrack entries that can blackhole traffic.
|
||||||
conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP
|
conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
|
||||||
conntrackCleanupServiceNodePorts := sets.NewInt()
|
conntrackCleanupServiceNodePorts := sets.NewInt()
|
||||||
// merge stale services gathered from updateEndpointsMap
|
// merge stale services gathered from updateEndpointsMap
|
||||||
// an UDP service that changes from 0 to non-0 endpoints is considered stale.
|
// an UDP service that changes from 0 to non-0 endpoints is considered stale.
|
||||||
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
|
for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
|
||||||
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
|
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
|
||||||
klog.V(2).InfoS("Stale UDP service", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP())
|
klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
|
||||||
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
|
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
|
||||||
for _, extIP := range svcInfo.ExternalIPStrings() {
|
for _, extIP := range svcInfo.ExternalIPStrings() {
|
||||||
conntrackCleanupServiceIPs.Insert(extIP)
|
conntrackCleanupServiceIPs.Insert(extIP)
|
||||||
@ -964,7 +964,6 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
}
|
}
|
||||||
nodePort := svcInfo.NodePort()
|
nodePort := svcInfo.NodePort()
|
||||||
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
|
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
|
||||||
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "nodePort", nodePort)
|
|
||||||
conntrackCleanupServiceNodePorts.Insert(nodePort)
|
conntrackCleanupServiceNodePorts.Insert(nodePort)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1545,8 +1544,8 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort)
|
klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints)
|
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.DeletedUDPEndpoints)
|
||||||
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
|
proxier.deleteUDPEndpointConnections(endpointUpdateResult.DeletedUDPEndpoints)
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
|
// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
|
||||||
@ -1817,8 +1816,8 @@ func (proxier *Proxier) createAndLinkKubeChain() {
|
|||||||
// risk sending more traffic to it, all of which will be lost (because UDP).
|
// risk sending more traffic to it, all of which will be lost (because UDP).
|
||||||
// This assumes the proxier mutex is held
|
// This assumes the proxier mutex is held
|
||||||
// TODO: move it to util
|
// TODO: move it to util
|
||||||
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) {
|
func (proxier *Proxier) deleteUDPEndpointConnections(deletedUDPEndpoints []proxy.ServiceEndpoint) {
|
||||||
for _, epSvcPair := range connectionMap {
|
for _, epSvcPair := range deletedUDPEndpoints {
|
||||||
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
|
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && svcInfo.Protocol() == v1.ProtocolUDP {
|
||||||
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
|
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
|
||||||
nodePort := svcInfo.NodePort()
|
nodePort := svcInfo.NodePort()
|
||||||
|
@ -2543,9 +2543,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
|||||||
t.Errorf("expected service map length 12, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 12, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
// Services only added, so nothing stale yet
|
// Services only added, so nothing stale yet
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
|
||||||
}
|
}
|
||||||
|
|
||||||
// The only-local-loadbalancer ones get added
|
// The only-local-loadbalancer ones get added
|
||||||
@ -2581,11 +2581,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
|||||||
// from the three deleted services here, we still have the ClusterIP for
|
// from the three deleted services here, we still have the ClusterIP for
|
||||||
// the not-deleted service, because one of it's ServicePorts was deleted.
|
// the not-deleted service, because one of it's ServicePorts was deleted.
|
||||||
expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
|
expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
|
||||||
if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) {
|
if len(result.DeletedUDPClusterIPs) != len(expectedStaleUDPServices) {
|
||||||
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.List())
|
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.DeletedUDPClusterIPs.List())
|
||||||
}
|
}
|
||||||
for _, ip := range expectedStaleUDPServices {
|
for _, ip := range expectedStaleUDPServices {
|
||||||
if !result.UDPStaleClusterIP.Has(ip) {
|
if !result.DeletedUDPClusterIPs.Has(ip) {
|
||||||
t.Errorf("expected stale UDP service service %s", ip)
|
t.Errorf("expected stale UDP service service %s", ip)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2625,8 +2625,8 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
|
|||||||
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
|
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
|
||||||
}
|
}
|
||||||
|
|
||||||
// No proxied services, so no healthchecks
|
// No proxied services, so no healthchecks
|
||||||
@ -2655,8 +2655,8 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 0 {
|
if len(fp.svcPortMap) != 0 {
|
||||||
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
|
t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// No proxied services, so no healthchecks
|
// No proxied services, so no healthchecks
|
||||||
@ -2699,9 +2699,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 2 {
|
if len(fp.svcPortMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
// Services only added, so nothing stale yet
|
// Services only added, so nothing stale yet
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
|
||||||
}
|
}
|
||||||
|
|
||||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||||
@ -2715,8 +2715,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 2 {
|
if len(fp.svcPortMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List())
|
t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.List())
|
||||||
}
|
}
|
||||||
|
|
||||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||||
@ -2731,8 +2731,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 2 {
|
if len(fp.svcPortMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List())
|
t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.List())
|
||||||
}
|
}
|
||||||
|
|
||||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||||
@ -2746,9 +2746,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 2 {
|
if len(fp.svcPortMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
// Services only added, so nothing stale yet
|
// Services only added, so nothing stale yet
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
|
||||||
}
|
}
|
||||||
|
|
||||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||||
@ -3148,22 +3148,22 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
// previousEndpoints and currentEndpoints are used to call appropriate
|
// previousEndpoints and currentEndpoints are used to call appropriate
|
||||||
// handlers OnEndpoints* (based on whether corresponding values are nil
|
// handlers OnEndpoints* (based on whether corresponding values are nil
|
||||||
// or non-nil) and must be of equal length.
|
// or non-nil) and must be of equal length.
|
||||||
name string
|
name string
|
||||||
previousEndpoints []*discovery.EndpointSlice
|
previousEndpoints []*discovery.EndpointSlice
|
||||||
currentEndpoints []*discovery.EndpointSlice
|
currentEndpoints []*discovery.EndpointSlice
|
||||||
oldEndpoints map[proxy.ServicePortName][]*proxy.BaseEndpointInfo
|
oldEndpoints map[proxy.ServicePortName][]*proxy.BaseEndpointInfo
|
||||||
expectedResult map[proxy.ServicePortName][]*proxy.BaseEndpointInfo
|
expectedResult map[proxy.ServicePortName][]*proxy.BaseEndpointInfo
|
||||||
expectedStaleEndpoints []proxy.ServiceEndpoint
|
expectedDeletedUDPEndpoints []proxy.ServiceEndpoint
|
||||||
expectedStaleServiceNames map[proxy.ServicePortName]bool
|
expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool
|
||||||
expectedReadyEndpoints map[types.NamespacedName]int
|
expectedReadyEndpoints map[types.NamespacedName]int
|
||||||
}{{
|
}{{
|
||||||
// Case[0]: nothing
|
// Case[0]: nothing
|
||||||
name: "nothing",
|
name: "nothing",
|
||||||
oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
|
oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
|
||||||
expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
|
expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[1]: no change, named port, local
|
// Case[1]: no change, named port, local
|
||||||
name: "no change, named port, local",
|
name: "no change, named port, local",
|
||||||
@ -3179,8 +3179,8 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:11", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:11", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 1,
|
makeNSN("ns1", "ep1"): 1,
|
||||||
},
|
},
|
||||||
@ -3205,9 +3205,9 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[3]: no change, multiple subsets, multiple ports, local
|
// Case[3]: no change, multiple subsets, multiple ports, local
|
||||||
name: "no change, multiple subsets, multiple ports, local",
|
name: "no change, multiple subsets, multiple ports, local",
|
||||||
@ -3235,8 +3235,8 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 1,
|
makeNSN("ns1", "ep1"): 1,
|
||||||
},
|
},
|
||||||
@ -3297,8 +3297,8 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "2.2.2.2:22", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "2.2.2.2:22", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||||
makeNSN("ns1", "ep1"): 2,
|
makeNSN("ns1", "ep1"): 2,
|
||||||
makeNSN("ns2", "ep2"): 1,
|
makeNSN("ns2", "ep2"): 1,
|
||||||
@ -3314,8 +3314,8 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:11", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:11", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||||
@ -3332,12 +3332,12 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
|
expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
||||||
Endpoint: "1.1.1.1:11",
|
Endpoint: "1.1.1.1:11",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[7]: add an IP and port
|
// Case[7]: add an IP and port
|
||||||
name: "add an IP and port",
|
name: "add an IP and port",
|
||||||
@ -3358,8 +3358,8 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.2:12", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.2:12", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||||
@ -3385,7 +3385,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
||||||
Endpoint: "1.1.1.2:11",
|
Endpoint: "1.1.1.2:11",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||||
}, {
|
}, {
|
||||||
@ -3395,8 +3395,8 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
Endpoint: "1.1.1.2:12",
|
Endpoint: "1.1.1.2:12",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[9]: add a subset
|
// Case[9]: add a subset
|
||||||
name: "add a subset",
|
name: "add a subset",
|
||||||
@ -3415,8 +3415,8 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.2:12", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.2:12", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{
|
expectedReadyEndpoints: map[types.NamespacedName]int{
|
||||||
@ -3440,12 +3440,12 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
||||||
Endpoint: "1.1.1.2:12",
|
Endpoint: "1.1.1.2:12",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[11]: rename a port
|
// Case[11]: rename a port
|
||||||
name: "rename a port",
|
name: "rename a port",
|
||||||
@ -3461,11 +3461,11 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
||||||
Endpoint: "1.1.1.1:11",
|
Endpoint: "1.1.1.1:11",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||||
@ -3484,12 +3484,12 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
||||||
Endpoint: "1.1.1.1:11",
|
Endpoint: "1.1.1.1:11",
|
||||||
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{},
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||||
}, {
|
}, {
|
||||||
// Case[13]: complex add and remove
|
// Case[13]: complex add and remove
|
||||||
name: "complex add and remove",
|
name: "complex add and remove",
|
||||||
@ -3532,7 +3532,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "4.4.4.4:44", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "4.4.4.4:44", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{{
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
|
||||||
Endpoint: "2.2.2.2:22",
|
Endpoint: "2.2.2.2:22",
|
||||||
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
|
||||||
}, {
|
}, {
|
||||||
@ -3548,7 +3548,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
Endpoint: "4.4.4.6:45",
|
Endpoint: "4.4.4.6:45",
|
||||||
ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
|
ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
|
||||||
}},
|
}},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
|
||||||
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
|
||||||
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
|
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
|
||||||
@ -3567,8 +3567,8 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedStaleEndpoints: []proxy.ServiceEndpoint{},
|
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
|
||||||
expectedStaleServiceNames: map[proxy.ServicePortName]bool{
|
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
|
||||||
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
|
||||||
},
|
},
|
||||||
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
expectedReadyEndpoints: map[types.NamespacedName]int{},
|
||||||
@ -3612,34 +3612,34 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
||||||
newMap := fp.endpointsMap
|
newMap := fp.endpointsMap
|
||||||
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
||||||
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
|
if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) {
|
||||||
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints)
|
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints)
|
||||||
}
|
}
|
||||||
for _, x := range tc.expectedStaleEndpoints {
|
for _, x := range tc.expectedDeletedUDPEndpoints {
|
||||||
found := false
|
found := false
|
||||||
for _, stale := range result.StaleEndpoints {
|
for _, stale := range result.DeletedUDPEndpoints {
|
||||||
if stale == x {
|
if stale == x {
|
||||||
found = true
|
found = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints)
|
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) {
|
if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) {
|
||||||
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames)
|
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices)
|
||||||
}
|
}
|
||||||
for svcName := range tc.expectedStaleServiceNames {
|
for svcName := range tc.expectedNewlyActiveUDPServices {
|
||||||
found := false
|
found := false
|
||||||
for _, stale := range result.StaleServiceNames {
|
for _, stale := range result.NewlyActiveUDPServices {
|
||||||
if stale == svcName {
|
if stale == svcName {
|
||||||
found = true
|
found = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if !found {
|
if !found {
|
||||||
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames)
|
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
|
||||||
|
@ -337,15 +337,16 @@ func (sct *ServiceChangeTracker) PendingChanges() sets.String {
|
|||||||
|
|
||||||
// UpdateServiceMapResult is the updated results after applying service changes.
|
// UpdateServiceMapResult is the updated results after applying service changes.
|
||||||
type UpdateServiceMapResult struct {
|
type UpdateServiceMapResult struct {
|
||||||
// UDPStaleClusterIP holds stale (no longer assigned to a Service) Service IPs that had UDP ports.
|
// DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs
|
||||||
// Callers can use this to abort timeout-waits or clear connection-tracking information.
|
// that had UDP ports. Callers can use this to abort timeout-waits or clear
|
||||||
UDPStaleClusterIP sets.String
|
// connection-tracking information.
|
||||||
|
DeletedUDPClusterIPs sets.String
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update updates ServicePortMap base on the given changes.
|
// Update updates ServicePortMap base on the given changes.
|
||||||
func (sm ServicePortMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
|
func (sm ServicePortMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
|
||||||
result.UDPStaleClusterIP = sets.NewString()
|
result.DeletedUDPClusterIPs = sets.NewString()
|
||||||
sm.apply(changes, result.UDPStaleClusterIP)
|
sm.apply(changes, result.DeletedUDPClusterIPs)
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -398,10 +399,9 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic
|
|||||||
return svcPortMap
|
return svcPortMap
|
||||||
}
|
}
|
||||||
|
|
||||||
// apply the changes to ServicePortMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the
|
// apply the changes to ServicePortMap and update the deleted UDP cluster IP set.
|
||||||
// udp protocol service cluster ip when service is deleted from the ServicePortMap.
|
|
||||||
// apply triggers processServiceMapChange on every change.
|
// apply triggers processServiceMapChange on every change.
|
||||||
func (sm *ServicePortMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) {
|
func (sm *ServicePortMap) apply(changes *ServiceChangeTracker, deletedUDPClusterIPs sets.String) {
|
||||||
changes.lock.Lock()
|
changes.lock.Lock()
|
||||||
defer changes.lock.Unlock()
|
defer changes.lock.Unlock()
|
||||||
for _, change := range changes.items {
|
for _, change := range changes.items {
|
||||||
@ -412,7 +412,7 @@ func (sm *ServicePortMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP
|
|||||||
// filter out the Update event of current changes from previous changes before calling unmerge() so that can
|
// filter out the Update event of current changes from previous changes before calling unmerge() so that can
|
||||||
// skip deleting the Update events.
|
// skip deleting the Update events.
|
||||||
change.previous.filter(change.current)
|
change.previous.filter(change.current)
|
||||||
sm.unmerge(change.previous, UDPStaleClusterIP)
|
sm.unmerge(change.previous, deletedUDPClusterIPs)
|
||||||
}
|
}
|
||||||
// clear changes after applying them to ServicePortMap.
|
// clear changes after applying them to ServicePortMap.
|
||||||
changes.items = make(map[types.NamespacedName]*serviceChange)
|
changes.items = make(map[types.NamespacedName]*serviceChange)
|
||||||
@ -467,15 +467,15 @@ func (sm *ServicePortMap) filter(other ServicePortMap) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// unmerge deletes all other ServicePortMap's elements from current ServicePortMap. We pass in the UDPStaleClusterIP strings sets
|
// unmerge deletes all other ServicePortMap's elements from current ServicePortMap and
|
||||||
// for storing the stale udp service cluster IPs. We will clear stale udp connection base on UDPStaleClusterIP later
|
// updates deletedUDPClusterIPs with all of the newly-deleted UDP cluster IPs.
|
||||||
func (sm *ServicePortMap) unmerge(other ServicePortMap, UDPStaleClusterIP sets.String) {
|
func (sm *ServicePortMap) unmerge(other ServicePortMap, deletedUDPClusterIPs sets.String) {
|
||||||
for svcPortName := range other {
|
for svcPortName := range other {
|
||||||
info, exists := (*sm)[svcPortName]
|
info, exists := (*sm)[svcPortName]
|
||||||
if exists {
|
if exists {
|
||||||
klog.V(4).InfoS("Removing service port", "portName", svcPortName)
|
klog.V(4).InfoS("Removing service port", "portName", svcPortName)
|
||||||
if info.Protocol() == v1.ProtocolUDP {
|
if info.Protocol() == v1.ProtocolUDP {
|
||||||
UDPStaleClusterIP.Insert(info.ClusterIP().String())
|
deletedUDPClusterIPs.Insert(info.ClusterIP().String())
|
||||||
}
|
}
|
||||||
delete(*sm, svcPortName)
|
delete(*sm, svcPortName)
|
||||||
} else {
|
} else {
|
||||||
|
@ -568,8 +568,9 @@ func TestServiceMapUpdateHeadless(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 0 {
|
if len(fp.svcPortMap) != 0 {
|
||||||
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
|
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
|
||||||
}
|
}
|
||||||
|
|
||||||
// No proxied services, so no healthchecks
|
// No proxied services, so no healthchecks
|
||||||
@ -599,8 +600,8 @@ func TestUpdateServiceTypeExternalName(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 0 {
|
if len(fp.svcPortMap) != 0 {
|
||||||
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP)
|
t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// No proxied services, so no healthchecks
|
// No proxied services, so no healthchecks
|
||||||
@ -673,9 +674,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 8 {
|
if len(fp.svcPortMap) != 8 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
// Services only added, so nothing stale yet
|
// Services only added, so nothing stale yet
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
|
||||||
}
|
}
|
||||||
|
|
||||||
// The only-local-loadbalancer ones get added
|
// The only-local-loadbalancer ones get added
|
||||||
@ -719,12 +720,12 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
|
|||||||
// All services but one were deleted. While you'd expect only the ClusterIPs
|
// All services but one were deleted. While you'd expect only the ClusterIPs
|
||||||
// from the three deleted services here, we still have the ClusterIP for
|
// from the three deleted services here, we still have the ClusterIP for
|
||||||
// the not-deleted service, because one of it's ServicePorts was deleted.
|
// the not-deleted service, because one of it's ServicePorts was deleted.
|
||||||
expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
|
expectedDeletedUDPClusterIPs := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
|
||||||
if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) {
|
if len(result.DeletedUDPClusterIPs) != len(expectedDeletedUDPClusterIPs) {
|
||||||
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList())
|
t.Errorf("expected stale UDP services length %d, got %v", len(expectedDeletedUDPClusterIPs), result.DeletedUDPClusterIPs.UnsortedList())
|
||||||
}
|
}
|
||||||
for _, ip := range expectedStaleUDPServices {
|
for _, ip := range expectedDeletedUDPClusterIPs {
|
||||||
if !result.UDPStaleClusterIP.Has(ip) {
|
if !result.DeletedUDPClusterIPs.Has(ip) {
|
||||||
t.Errorf("expected stale UDP service service %s", ip)
|
t.Errorf("expected stale UDP service service %s", ip)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -764,9 +765,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 2 {
|
if len(fp.svcPortMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
// Services only added, so nothing stale yet
|
// Services only added, so nothing stale yet
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
|
||||||
}
|
}
|
||||||
|
|
||||||
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
|
||||||
@ -784,8 +785,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 2 {
|
if len(fp.svcPortMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
|
||||||
}
|
}
|
||||||
|
|
||||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||||
@ -804,8 +805,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 2 {
|
if len(fp.svcPortMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList())
|
t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
|
||||||
}
|
}
|
||||||
|
|
||||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||||
@ -823,9 +824,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
|
|||||||
if len(fp.svcPortMap) != 2 {
|
if len(fp.svcPortMap) != 2 {
|
||||||
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
|
||||||
}
|
}
|
||||||
if len(result.UDPStaleClusterIP) != 0 {
|
if len(result.DeletedUDPClusterIPs) != 0 {
|
||||||
// Services only added, so nothing stale yet
|
// Services only added, so nothing stale yet
|
||||||
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP))
|
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
|
||||||
}
|
}
|
||||||
|
|
||||||
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
|
||||||
|
@ -1202,12 +1202,12 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
|
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
|
||||||
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
|
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
|
||||||
|
|
||||||
staleServices := serviceUpdateResult.UDPStaleClusterIP
|
deletedUDPClusterIPs := serviceUpdateResult.DeletedUDPClusterIPs
|
||||||
// merge stale services gathered from updateEndpointsMap
|
// merge stale services gathered from updateEndpointsMap
|
||||||
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
|
for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
|
||||||
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
|
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
|
||||||
klog.V(2).InfoS("Stale udp service", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP())
|
klog.V(2).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
|
||||||
staleServices.Insert(svcInfo.ClusterIP().String())
|
deletedUDPClusterIPs.Insert(svcInfo.ClusterIP().String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Query HNS for endpoints and load balancers
|
// Query HNS for endpoints and load balancers
|
||||||
@ -1653,7 +1653,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
|
|
||||||
// Finish housekeeping.
|
// Finish housekeeping.
|
||||||
// TODO: these could be made more consistent.
|
// TODO: these could be made more consistent.
|
||||||
for _, svcIP := range staleServices.UnsortedList() {
|
for _, svcIP := range deletedUDPClusterIPs.UnsortedList() {
|
||||||
// TODO : Check if this is required to cleanup stale services here
|
// TODO : Check if this is required to cleanup stale services here
|
||||||
klog.V(5).InfoS("Pending delete stale service IP connections", "IP", svcIP)
|
klog.V(5).InfoS("Pending delete stale service IP connections", "IP", svcIP)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user