Merge pull request #115299 from danwinship/stale-conntrack-handling

clean up kube-proxy stale-conntrack-entry handling, revert broken code
This commit is contained in:
Kubernetes Prow Robot 2023-03-14 12:38:26 -07:00 committed by GitHub
commit 1d830156c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 471 additions and 603 deletions

View File

@ -293,10 +293,17 @@ type endpointsChange struct {
// UpdateEndpointMapResult is the updated results after applying endpoints changes. // UpdateEndpointMapResult is the updated results after applying endpoints changes.
type UpdateEndpointMapResult struct { type UpdateEndpointMapResult struct {
// StaleEndpoints identifies if an endpoints service pair is stale. // DeletedUDPEndpoints identifies UDP endpoints that have just been deleted.
StaleEndpoints []ServiceEndpoint // Existing conntrack NAT entries pointing to these endpoints must be deleted to
// StaleServiceNames identifies if a service is stale. // ensure that no further traffic for the Service gets delivered to them.
StaleServiceNames []ServicePortName DeletedUDPEndpoints []ServiceEndpoint
// NewlyActiveUDPServices identifies UDP Services that have just gone from 0 to
// non-0 endpoints. Existing conntrack entries caching the fact that these
// services are black holes must be deleted to ensure that traffic can immediately
// begin flowing to the new endpoints.
NewlyActiveUDPServices []ServicePortName
// List of the trigger times for all endpoints objects that changed. It's used to export the // List of the trigger times for all endpoints objects that changed. It's used to export the
// network programming latency. // network programming latency.
// NOTE(oxddr): this can be simplified to []time.Time if memory consumption becomes an issue. // NOTE(oxddr): this can be simplified to []time.Time if memory consumption becomes an issue.
@ -305,26 +312,24 @@ type UpdateEndpointMapResult struct {
// Update updates endpointsMap base on the given changes. // Update updates endpointsMap base on the given changes.
func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) { func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
result.StaleEndpoints = make([]ServiceEndpoint, 0) result.DeletedUDPEndpoints = make([]ServiceEndpoint, 0)
result.StaleServiceNames = make([]ServicePortName, 0) result.NewlyActiveUDPServices = make([]ServicePortName, 0)
result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time) result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
em.apply( em.apply(changes, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices, &result.LastChangeTriggerTimes)
changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes)
return result return result
} }
// EndpointsMap maps a service name to a list of all its Endpoints. // EndpointsMap maps a service name to a list of all its Endpoints.
type EndpointsMap map[ServicePortName][]Endpoint type EndpointsMap map[ServicePortName][]Endpoint
// apply the changes to EndpointsMap and updates stale endpoints and service-endpoints pair. The `staleEndpoints` argument // apply the changes to EndpointsMap, update the passed-in stale-conntrack-entry arrays,
// is passed in to store the stale udp endpoints and `staleServiceNames` argument is passed in to store the stale udp service. // and clear the changes map. In addition it returns (via argument) and resets the
// The changes map is cleared after applying them. // lastChangeTriggerTimes for all endpoints that were changed and will result in syncing
// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints // the proxy rules. apply triggers processEndpointsMapChange on every change.
// that were changed and will result in syncing the proxy rules. func (em EndpointsMap) apply(ect *EndpointChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint,
// apply triggers processEndpointsMapChange on every change. newlyActiveUDPServices *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
if ect == nil { if ect == nil {
return return
} }
@ -336,7 +341,7 @@ func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]Servi
} }
em.unmerge(change.previous) em.unmerge(change.previous)
em.merge(change.current) em.merge(change.current)
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames) detectStaleConntrackEntries(change.previous, change.current, deletedUDPEndpoints, newlyActiveUDPServices)
} }
ect.checkoutTriggerTimes(lastChangeTriggerTimes) ect.checkoutTriggerTimes(lastChangeTriggerTimes)
} }
@ -397,41 +402,45 @@ func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int {
return eps return eps
} }
// detectStaleConnections modifies <staleEndpoints> and <staleServices> with detected stale connections. <staleServiceNames> // detectStaleConntrackEntries detects services that may be associated with stale conntrack entries.
// is used to store stale udp service in order to clear udp conntrack later. // (See UpdateEndpointMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.)
func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) { func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName) {
// Detect stale endpoints: an endpoint can have stale conntrack entries if it was receiving traffic // Find the UDP endpoints that we were sending traffic to in oldEndpointsMap, but
// and then goes unready or changes its IP address. // are no longer sending to newEndpointsMap. The proxier should make sure that
// conntrack does not accidentally route any new connections to them.
for svcPortName, epList := range oldEndpointsMap { for svcPortName, epList := range oldEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP { if svcPortName.Protocol != v1.ProtocolUDP {
continue continue
} }
for _, ep := range epList { for _, ep := range epList {
// if the old endpoint wasn't ready is not possible to have stale entries // If the old endpoint wasn't Ready then there can't be stale
// since there was no traffic sent to it. // conntrack entries since there was no traffic sent to it.
if !ep.IsReady() { if !ep.IsReady() {
continue continue
} }
stale := true
// Check if the endpoint has changed, including if it went from ready to not ready. deleted := true
// If it did change stale entries for the old endpoint has to be cleared. // Check if the endpoint has changed, including if it went from
// ready to not ready. If it did change stale entries for the old
// endpoint have to be cleared.
for i := range newEndpointsMap[svcPortName] { for i := range newEndpointsMap[svcPortName] {
if newEndpointsMap[svcPortName][i].Equal(ep) { if newEndpointsMap[svcPortName][i].Equal(ep) {
stale = false deleted = false
break break
} }
} }
if stale { if deleted {
klog.V(4).InfoS("Stale endpoint", "portName", svcPortName, "endpoint", ep) klog.V(4).InfoS("Deleted endpoint may have stale conntrack entries", "portName", svcPortName, "endpoint", ep)
*staleEndpoints = append(*staleEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName}) *deletedUDPEndpoints = append(*deletedUDPEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName})
} }
} }
} }
// Detect stale services // Detect services that have gone from 0 to non-0 ready endpoints. If there were
// For udp service, if its backend changes from 0 to non-0 ready endpoints. // previously 0 endpoints, but someone tried to connect to it, then a conntrack
// There may exist a conntrack entry that could blackhole traffic to the service. // entry may have been created blackholing traffic to that IP, which should be
// deleted now.
for svcPortName, epList := range newEndpointsMap { for svcPortName, epList := range newEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP { if svcPortName.Protocol != v1.ProtocolUDP {
continue continue
@ -452,7 +461,7 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, stale
} }
if epReady > 0 && oldEpReady == 0 { if epReady > 0 && oldEpReady == 0 {
*staleServiceNames = append(*staleServiceNames, svcPortName) *newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName)
} }
} }
} }

View File

@ -498,23 +498,23 @@ func TestUpdateEndpointsMap(t *testing.T) {
// previousEndpoints and currentEndpoints are used to call appropriate // previousEndpoints and currentEndpoints are used to call appropriate
// handlers OnEndpointSlice* (based on whether corresponding values are nil // handlers OnEndpointSlice* (based on whether corresponding values are nil
// or non-nil) and must be of equal length. // or non-nil) and must be of equal length.
name string name string
previousEndpoints []*discovery.EndpointSlice previousEndpoints []*discovery.EndpointSlice
currentEndpoints []*discovery.EndpointSlice currentEndpoints []*discovery.EndpointSlice
oldEndpoints map[ServicePortName][]*BaseEndpointInfo oldEndpoints map[ServicePortName][]*BaseEndpointInfo
expectedResult map[ServicePortName][]*BaseEndpointInfo expectedResult map[ServicePortName][]*BaseEndpointInfo
expectedStaleEndpoints []ServiceEndpoint expectedDeletedUDPEndpoints []ServiceEndpoint
expectedStaleServiceNames map[ServicePortName]bool expectedNewlyActiveUDPServices map[ServicePortName]bool
expectedLocalEndpoints map[types.NamespacedName]int expectedLocalEndpoints map[types.NamespacedName]int
expectedChangedEndpoints sets.String expectedChangedEndpoints sets.String
}{{ }{{
name: "empty", name: "empty",
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{}, oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
expectedStaleEndpoints: []ServiceEndpoint{}, expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(), expectedChangedEndpoints: sets.NewString(),
}, { }, {
name: "no change, unnamed port", name: "no change, unnamed port",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpoints: []*discovery.EndpointSlice{
@ -533,10 +533,10 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{}, expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(), expectedChangedEndpoints: sets.NewString(),
}, { }, {
name: "no change, named port, local", name: "no change, named port, local",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpoints: []*discovery.EndpointSlice{
@ -555,8 +555,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{}, expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{ expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1, makeNSN("ns1", "ep1"): 1,
}, },
@ -587,10 +587,10 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{}, expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString(), expectedChangedEndpoints: sets.NewString(),
}, { }, {
name: "no change, multiple slices, multiple ports, local", name: "no change, multiple slices, multiple ports, local",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpoints: []*discovery.EndpointSlice{
@ -623,8 +623,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{}, expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{ expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1, makeNSN("ns1", "ep1"): 1,
}, },
@ -693,8 +693,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "2.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "2.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{}, expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{ expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 2, makeNSN("ns1", "ep1"): 2,
makeNSN("ns2", "ep2"): 1, makeNSN("ns2", "ep2"): 1,
@ -714,8 +714,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{}, expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{ expectedNewlyActiveUDPServices: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
}, },
expectedLocalEndpoints: map[types.NamespacedName]int{ expectedLocalEndpoints: map[types.NamespacedName]int{
@ -736,13 +736,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
}, },
}, },
expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
expectedStaleEndpoints: []ServiceEndpoint{{ expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "1.1.1.1:11", Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"), expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, { }, {
name: "add an IP and port", name: "add an IP and port",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpoints: []*discovery.EndpointSlice{
@ -766,8 +766,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{}, expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{ expectedNewlyActiveUDPServices: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
}, },
expectedLocalEndpoints: map[types.NamespacedName]int{ expectedLocalEndpoints: map[types.NamespacedName]int{
@ -797,7 +797,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{{ expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "1.1.1.2:11", Endpoint: "1.1.1.2:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}, { }, {
@ -807,9 +807,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
Endpoint: "1.1.1.2:12", Endpoint: "1.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"), expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, { }, {
name: "add a slice to an endpoint", name: "add a slice to an endpoint",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpoints: []*discovery.EndpointSlice{
@ -833,8 +833,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{}, expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{ expectedNewlyActiveUDPServices: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
}, },
expectedLocalEndpoints: map[types.NamespacedName]int{ expectedLocalEndpoints: map[types.NamespacedName]int{
@ -864,13 +864,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{{ expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "1.1.1.2:12", Endpoint: "1.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"), expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, { }, {
name: "rename a port", name: "rename a port",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpoints: []*discovery.EndpointSlice{
@ -889,11 +889,11 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{{ expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "1.1.1.1:11", Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[ServicePortName]bool{ expectedNewlyActiveUDPServices: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
}, },
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
@ -916,13 +916,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{{ expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "1.1.1.1:11", Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.NewString("ns1/ep1"), expectedChangedEndpoints: sets.NewString("ns1/ep1"),
}, { }, {
name: "complex add and remove", name: "complex add and remove",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpoints: []*discovery.EndpointSlice{
@ -988,7 +988,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "4.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "4.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{{ expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "2.2.2.2:22", Endpoint: "2.2.2.2:22",
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
}, { }, {
@ -1004,7 +1004,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
Endpoint: "4.4.4.6:45", Endpoint: "4.4.4.6:45",
ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[ServicePortName]bool{ expectedNewlyActiveUDPServices: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
@ -1027,8 +1027,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []ServiceEndpoint{}, expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedStaleServiceNames: map[ServicePortName]bool{ expectedNewlyActiveUDPServices: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
}, },
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
@ -1079,33 +1079,33 @@ func TestUpdateEndpointsMap(t *testing.T) {
result := fp.endpointsMap.Update(fp.endpointsChanges) result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap newMap := fp.endpointsMap
compareEndpointsMapsStr(t, newMap, tc.expectedResult) compareEndpointsMapsStr(t, newMap, tc.expectedResult)
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) { if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) {
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints) t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints)
} }
for _, x := range tc.expectedStaleEndpoints { for _, x := range tc.expectedDeletedUDPEndpoints {
found := false found := false
for _, stale := range result.StaleEndpoints { for _, stale := range result.DeletedUDPEndpoints {
if stale == x { if stale == x {
found = true found = true
break break
} }
} }
if !found { if !found {
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints) t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints)
} }
} }
if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) { if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) {
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames) t.Errorf("[%d] expected %d newlyActiveUDPServices, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices)
} }
for svcName := range tc.expectedStaleServiceNames { for svcName := range tc.expectedNewlyActiveUDPServices {
found := false found := false
for _, stale := range result.StaleServiceNames { for _, newSvcName := range result.NewlyActiveUDPServices {
if stale == svcName { if newSvcName == svcName {
found = true found = true
} }
} }
if !found { if !found {
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) t.Errorf("[%d] expected newlyActiveUDPServices[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices)
} }
} }

View File

@ -744,35 +744,34 @@ func isServiceChainName(chainString string) bool {
return false return false
} }
// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost. // risk sending more traffic to it, all of which will be lost (because UDP).
// This assumes the proxier mutex is held // This assumes the proxier mutex is held
// TODO: move it to util // TODO: move it to util
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { func (proxier *Proxier) deleteUDPEndpointConnections(deletedUDPEndpoints []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap { for _, epSvcPair := range deletedUDPEndpoints {
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.NodePort() nodePort := svcInfo.NodePort()
svcProto := svcInfo.Protocol()
var err error var err error
if nodePort != 0 { if nodePort != 0 {
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto) err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName) klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName)
} }
} }
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName) klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName)
} }
for _, extIP := range svcInfo.ExternalIPStrings() { for _, extIP := range svcInfo.ExternalIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
} }
} }
for _, lbIP := range svcInfo.LoadBalancerIPStrings() { for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)
} }
@ -834,13 +833,13 @@ func (proxier *Proxier) syncProxyRules() {
// We need to detect stale connections to UDP Services so we // We need to detect stale connections to UDP Services so we
// can clean dangling conntrack entries that can blackhole traffic. // can clean dangling conntrack entries that can blackhole traffic.
conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
conntrackCleanupServiceNodePorts := sets.NewInt() conntrackCleanupServiceNodePorts := sets.NewInt()
// merge stale services gathered from updateEndpointsMap // merge stale services gathered from updateEndpointsMap
// an UDP service that changes from 0 to non-0 endpoints is considered stale. // an UDP service that changes from 0 to non-0 endpoints is considered stale.
for _, svcPortName := range endpointUpdateResult.StaleServiceNames { for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok {
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() { for _, extIP := range svcInfo.ExternalIPStrings() {
conntrackCleanupServiceIPs.Insert(extIP) conntrackCleanupServiceIPs.Insert(extIP)
@ -850,7 +849,6 @@ func (proxier *Proxier) syncProxyRules() {
} }
nodePort := svcInfo.NodePort() nodePort := svcInfo.NodePort()
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 { if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "nodePort", nodePort)
conntrackCleanupServiceNodePorts.Insert(nodePort) conntrackCleanupServiceNodePorts.Insert(nodePort)
} }
} }
@ -1638,8 +1636,8 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort) klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort)
} }
} }
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints) klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.DeletedUDPEndpoints)
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) proxier.deleteUDPEndpointConnections(endpointUpdateResult.DeletedUDPEndpoints)
} }
func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) { func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {

View File

@ -60,7 +60,7 @@ import (
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
) )
func TestDeleteEndpointConnectionsIPv4(t *testing.T) { func TestDeleteEndpointConnections(t *testing.T) {
const ( const (
UDP = v1.ProtocolUDP UDP = v1.ProtocolUDP
TCP = v1.ProtocolTCP TCP = v1.ProtocolTCP
@ -73,8 +73,7 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) {
svcIP string svcIP string
svcPort int32 svcPort int32
protocol v1.Protocol protocol v1.Protocol
endpoint string // IP:port endpoint endpoint string // IP:port endpoint
epSvcPair proxy.ServiceEndpoint // Will be generated by test
simulatedErr string simulatedErr string
}{ }{
{ {
@ -104,140 +103,21 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) {
{ {
description: "V4 UDP, nothing to delete, benign error", description: "V4 UDP, nothing to delete, benign error",
svcName: "v4-udp-nothing-to-delete", svcName: "v4-udp-nothing-to-delete",
svcIP: "172.30.1.1", svcIP: "172.30.4.4",
svcPort: 80, svcPort: 80,
protocol: UDP, protocol: UDP,
endpoint: "10.240.0.3:80", endpoint: "10.240.0.6:80",
simulatedErr: conntrack.NoConnectionToDelete, simulatedErr: conntrack.NoConnectionToDelete,
}, },
{ {
description: "V4 UDP, unexpected error, should be glogged", description: "V4 UDP, unexpected error, should be glogged",
svcName: "v4-udp-simulated-error", svcName: "v4-udp-simulated-error",
svcIP: "172.30.1.1", svcIP: "172.30.5.5",
svcPort: 80, svcPort: 80,
protocol: UDP, protocol: UDP,
endpoint: "10.240.0.3:80", endpoint: "10.240.0.7:80",
simulatedErr: "simulated error", simulatedErr: "simulated error",
}, },
}
// Create a fake executor for the conntrack utility. This should only be
// invoked for UDP and SCTP connections, since no conntrack cleanup is needed for TCP
fcmd := fakeexec.FakeCmd{}
fexec := &fakeexec.FakeExec{
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
execFunc := func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
}
for _, tc := range testCases {
if conntrack.IsClearConntrackNeeded(tc.protocol) {
var cmdOutput string
var simErr error
if tc.simulatedErr == "" {
cmdOutput = "1 flow entries have been deleted"
} else {
simErr = fmt.Errorf(tc.simulatedErr)
}
cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr }
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
}
}
ipt := iptablestest.NewFake()
fp := NewFakeProxier(ipt)
fp.exec = fexec
for _, tc := range testCases {
makeServiceMap(fp,
makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
svc.Spec.ClusterIP = tc.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: tc.svcPort,
Protocol: tc.protocol,
}}
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
}),
)
fp.svcPortMap.Update(fp.serviceChanges)
}
// Run the test cases
for _, tc := range testCases {
priorExecs := fexec.CommandCalls
priorGlogErrs := klog.Stats.Error.Lines()
svc := proxy.ServicePortName{
NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
Port: "p80",
Protocol: tc.protocol,
}
input := []proxy.ServiceEndpoint{
{
Endpoint: tc.endpoint,
ServicePortName: svc,
},
}
fp.deleteEndpointConnections(input)
// For UDP and SCTP connections, check the executed conntrack command
var expExecs int
if conntrack.IsClearConntrackNeeded(tc.protocol) {
isIPv6 := func(ip string) bool {
netIP := netutils.ParseIPSloppy(ip)
return netIP.To4() == nil
}
endpointIP := utilproxy.IPPart(tc.endpoint)
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.svcIP, endpointIP, strings.ToLower(string((tc.protocol))))
if isIPv6(endpointIP) {
expectCommand += " -f ipv6"
}
actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ")
if actualCommand != expectCommand {
t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand)
}
expExecs = 1
}
// Check the number of times conntrack was executed
execs := fexec.CommandCalls - priorExecs
if execs != expExecs {
t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs)
}
// Check the number of new glog errors
var expGlogErrs int64
if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
expGlogErrs = 1
}
glogErrs := klog.Stats.Error.Lines() - priorGlogErrs
if glogErrs != expGlogErrs {
t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs)
}
}
}
func TestDeleteEndpointConnectionsIPv6(t *testing.T) {
const (
UDP = v1.ProtocolUDP
TCP = v1.ProtocolTCP
SCTP = v1.ProtocolSCTP
)
testCases := []struct {
description string
svcName string
svcIP string
svcPort int32
protocol v1.Protocol
endpoint string // IP:port endpoint
epSvcPair proxy.ServiceEndpoint // Will be generated by test
simulatedErr string
}{
{ {
description: "V6 UDP", description: "V6 UDP",
svcName: "v6-udp", svcName: "v6-udp",
@ -264,103 +144,128 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) {
}, },
} }
// Create a fake executor for the conntrack utility. This should only be
// invoked for UDP and SCTP connections, since no conntrack cleanup is needed for TCP
fcmd := fakeexec.FakeCmd{}
fexec := &fakeexec.FakeExec{
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
}
execFunc := func(cmd string, args ...string) exec.Cmd {
return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
}
for _, tc := range testCases { for _, tc := range testCases {
if conntrack.IsClearConntrackNeeded(tc.protocol) { t.Run(tc.description, func(t *testing.T) {
var cmdOutput string priorGlogErrs := klog.Stats.Error.Lines()
var simErr error
if tc.simulatedErr == "" { // Create a fake executor for the conntrack utility.
cmdOutput = "1 flow entries have been deleted" fcmd := fakeexec.FakeCmd{}
} else { fexec := &fakeexec.FakeExec{
simErr = fmt.Errorf(tc.simulatedErr) LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
} }
cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr } execFunc := func(cmd string, args ...string) exec.Cmd {
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) return fakeexec.InitFakeCmd(&fcmd, cmd, args...)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
}
}
ipt := iptablestest.NewIPv6Fake()
fp := NewFakeProxier(ipt)
fp.exec = fexec
for _, tc := range testCases {
makeServiceMap(fp,
makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
svc.Spec.ClusterIP = tc.svcIP
svc.Spec.Ports = []v1.ServicePort{{
Name: "p80",
Port: tc.svcPort,
Protocol: tc.protocol,
}}
svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
}),
)
fp.svcPortMap.Update(fp.serviceChanges)
}
// Run the test cases
for _, tc := range testCases {
priorExecs := fexec.CommandCalls
priorGlogErrs := klog.Stats.Error.Lines()
svc := proxy.ServicePortName{
NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName},
Port: "p80",
Protocol: tc.protocol,
}
input := []proxy.ServiceEndpoint{
{
Endpoint: tc.endpoint,
ServicePortName: svc,
},
}
fp.deleteEndpointConnections(input)
// For UDP and SCTP connections, check the executed conntrack command
var expExecs int
if conntrack.IsClearConntrackNeeded(tc.protocol) {
isIPv6 := func(ip string) bool {
netIP := netutils.ParseIPSloppy(ip)
return netIP.To4() == nil
} }
if tc.protocol == UDP {
cmdOutput := "1 flow entries have been deleted"
var simErr error
// First call outputs cmdOutput and succeeds
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript,
func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil },
)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
// Second call may succeed or fail
if tc.simulatedErr != "" {
cmdOutput = ""
simErr = fmt.Errorf(tc.simulatedErr)
}
fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript,
func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr },
)
fexec.CommandScript = append(fexec.CommandScript, execFunc)
}
endpointIP := utilproxy.IPPart(tc.endpoint) endpointIP := utilproxy.IPPart(tc.endpoint)
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.svcIP, endpointIP, strings.ToLower(string((tc.protocol)))) isIPv6 := netutils.IsIPv6String(endpointIP)
if isIPv6(endpointIP) {
expectCommand += " -f ipv6"
}
actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ")
if actualCommand != expectCommand {
t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand)
}
expExecs = 1
}
// Check the number of times conntrack was executed var ipt utiliptables.Interface
execs := fexec.CommandCalls - priorExecs if isIPv6 {
if execs != expExecs { ipt = iptablestest.NewIPv6Fake()
t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs) } else {
} ipt = iptablestest.NewFake()
}
fp := NewFakeProxier(ipt)
fp.exec = fexec
// Check the number of new glog errors makeServiceMap(fp,
var expGlogErrs int64 makeTestService("ns1", tc.svcName, func(svc *v1.Service) {
if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete { svc.Spec.ClusterIP = tc.svcIP
expGlogErrs = 1 svc.Spec.Ports = []v1.ServicePort{{
} Name: "p80",
glogErrs := klog.Stats.Error.Lines() - priorGlogErrs Port: tc.svcPort,
if glogErrs != expGlogErrs { Protocol: tc.protocol,
t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs) }}
} svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
}),
)
fp.svcPortMap.Update(fp.serviceChanges)
slice := makeTestEndpointSlice("ns1", tc.svcName, 1, func(eps *discovery.EndpointSlice) {
if isIPv6 {
eps.AddressType = discovery.AddressTypeIPv6
} else {
eps.AddressType = discovery.AddressTypeIPv4
}
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{endpointIP},
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.String("p80"),
Port: pointer.Int32(80),
Protocol: &tc.protocol,
}}
})
// Add and then remove the endpoint slice
fp.OnEndpointSliceAdd(slice)
fp.syncProxyRules()
fp.OnEndpointSliceDelete(slice)
fp.syncProxyRules()
// Check the executed conntrack command
if tc.protocol == UDP {
if fexec.CommandCalls != 2 {
t.Fatalf("Expected conntrack to be executed 2 times, but got %d", fexec.CommandCalls)
}
// First clear conntrack entries for the clusterIP when the
// endpoint is first added.
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.svcIP)
if isIPv6 {
expectCommand += " -f ipv6"
}
actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ")
if actualCommand != expectCommand {
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
}
// Then clear conntrack entries for the endpoint when it is
// deleted.
expectCommand = fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP)
if isIPv6 {
expectCommand += " -f ipv6"
}
actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ")
if actualCommand != expectCommand {
t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand)
}
} else if fexec.CommandCalls != 0 {
t.Fatalf("Expected conntrack to be executed 0 times, but got %d", fexec.CommandCalls)
}
// Check the number of new glog errors
var expGlogErrs int64
if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete {
expGlogErrs = 1
}
glogErrs := klog.Stats.Error.Lines() - priorGlogErrs
if glogErrs != expGlogErrs {
t.Errorf("Expected %d glogged errors, but got %d", expGlogErrs, glogErrs)
}
})
} }
} }
@ -384,10 +289,12 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
// TODO: Call NewProxier after refactoring out the goroutine // TODO: Call NewProxier after refactoring out the goroutine
// invocation into a Run() method. // invocation into a Run() method.
ipfamily := v1.IPv4Protocol ipfamily := v1.IPv4Protocol
podCIDR := "10.0.0.0/8"
if ipt.IsIPv6() { if ipt.IsIPv6() {
ipfamily = v1.IPv6Protocol ipfamily = v1.IPv6Protocol
podCIDR = "fd00::/64"
} }
detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR("10.0.0.0/8", ipt) detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR(podCIDR, ipt)
networkInterfacer := utilproxytest.NewFakeNetwork() networkInterfacer := utilproxytest.NewFakeNetwork()
itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0} itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0}
@ -4079,9 +3986,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
t.Errorf("expected service map length 10, got %v", fp.svcPortMap) t.Errorf("expected service map length 10, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
// Services only added, so nothing stale yet // Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
} }
// The only-local-loadbalancer ones get added // The only-local-loadbalancer ones get added
@ -4117,11 +4024,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
// from the three deleted services here, we still have the ClusterIP for // from the three deleted services here, we still have the ClusterIP for
// the not-deleted service, because one of it's ServicePorts was deleted. // the not-deleted service, because one of it's ServicePorts was deleted.
expectedStaleUDPServices := []string{"172.30.55.10", "172.30.55.4", "172.30.55.11", "172.30.55.12"} expectedStaleUDPServices := []string{"172.30.55.10", "172.30.55.4", "172.30.55.11", "172.30.55.12"}
if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) { if len(result.DeletedUDPClusterIPs) != len(expectedStaleUDPServices) {
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList()) t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.DeletedUDPClusterIPs.UnsortedList())
} }
for _, ip := range expectedStaleUDPServices { for _, ip := range expectedStaleUDPServices {
if !result.UDPStaleClusterIP.Has(ip) { if !result.DeletedUDPClusterIPs.Has(ip) {
t.Errorf("expected stale UDP service service %s", ip) t.Errorf("expected stale UDP service service %s", ip)
} }
} }
@ -4154,8 +4061,8 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
} }
// No proxied services, so no healthchecks // No proxied services, so no healthchecks
@ -4182,8 +4089,8 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
if len(fp.svcPortMap) != 0 { if len(fp.svcPortMap) != 0 {
t.Errorf("expected service map length 0, got %v", fp.svcPortMap) t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs)
} }
// No proxied services, so no healthchecks // No proxied services, so no healthchecks
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
@ -4223,9 +4130,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if len(fp.svcPortMap) != 2 { if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap) t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
// Services only added, so nothing stale yet // Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
} }
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
if len(healthCheckNodePorts) != 0 { if len(healthCheckNodePorts) != 0 {
@ -4238,8 +4145,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if len(fp.svcPortMap) != 2 { if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap) t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
} }
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
if len(healthCheckNodePorts) != 1 { if len(healthCheckNodePorts) != 1 {
@ -4253,8 +4160,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if len(fp.svcPortMap) != 2 { if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap) t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
} }
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
if len(healthCheckNodePorts) != 1 { if len(healthCheckNodePorts) != 1 {
@ -4267,9 +4174,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if len(fp.svcPortMap) != 2 { if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap) t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
// Services only added, so nothing stale yet // Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
} }
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
if len(healthCheckNodePorts) != 0 { if len(healthCheckNodePorts) != 0 {
@ -4674,22 +4581,22 @@ func TestUpdateEndpointsMap(t *testing.T) {
// previousEndpoints and currentEndpoints are used to call appropriate // previousEndpoints and currentEndpoints are used to call appropriate
// handlers OnEndpoints* (based on whether corresponding values are nil // handlers OnEndpoints* (based on whether corresponding values are nil
// or non-nil) and must be of equal length. // or non-nil) and must be of equal length.
name string name string
previousEndpoints []*discovery.EndpointSlice previousEndpoints []*discovery.EndpointSlice
currentEndpoints []*discovery.EndpointSlice currentEndpoints []*discovery.EndpointSlice
oldEndpoints map[proxy.ServicePortName][]*endpointsInfo oldEndpoints map[proxy.ServicePortName][]*endpointsInfo
expectedResult map[proxy.ServicePortName][]*endpointsInfo expectedResult map[proxy.ServicePortName][]*endpointsInfo
expectedStaleEndpoints []proxy.ServiceEndpoint expectedDeletedUDPEndpoints []proxy.ServiceEndpoint
expectedStaleServiceNames map[proxy.ServicePortName]bool expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool
expectedLocalEndpoints map[types.NamespacedName]int expectedLocalEndpoints map[types.NamespacedName]int
}{{ }{{
// Case[0]: nothing // Case[0]: nothing
name: "nothing", name: "nothing",
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
}, { }, {
// Case[1]: no change, named port, local // Case[1]: no change, named port, local
name: "no change, named port, local", name: "no change, named port, local",
@ -4705,8 +4612,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{ expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1, makeNSN("ns1", "ep1"): 1,
}, },
@ -4731,9 +4638,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
}, { }, {
// Case[3]: no change, multiple subsets, multiple ports, local // Case[3]: no change, multiple subsets, multiple ports, local
name: "no change, multiple subsets, multiple ports, local", name: "no change, multiple subsets, multiple ports, local",
@ -4761,8 +4668,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{ expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1, makeNSN("ns1", "ep1"): 1,
}, },
@ -4823,8 +4730,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{ expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 2, makeNSN("ns1", "ep1"): 2,
makeNSN("ns2", "ep2"): 1, makeNSN("ns2", "ep2"): 1,
@ -4840,8 +4747,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{ expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
}, },
expectedLocalEndpoints: map[types.NamespacedName]int{ expectedLocalEndpoints: map[types.NamespacedName]int{
@ -4858,12 +4765,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{{ expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.1:11", Endpoint: "10.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
}, { }, {
// Case[7]: add an IP and port // Case[7]: add an IP and port
name: "add an IP and port", name: "add an IP and port",
@ -4884,8 +4791,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{ expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
}, },
expectedLocalEndpoints: map[types.NamespacedName]int{ expectedLocalEndpoints: map[types.NamespacedName]int{
@ -4911,7 +4818,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{{ expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.2:11", Endpoint: "10.1.1.2:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}, { }, {
@ -4921,8 +4828,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
Endpoint: "10.1.1.2:12", Endpoint: "10.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
}, { }, {
// Case[9]: add a subset // Case[9]: add a subset
name: "add a subset", name: "add a subset",
@ -4941,8 +4848,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{ expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
}, },
expectedLocalEndpoints: map[types.NamespacedName]int{ expectedLocalEndpoints: map[types.NamespacedName]int{
@ -4966,12 +4873,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{{ expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.2:12", Endpoint: "10.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
}, { }, {
// Case[11]: rename a port // Case[11]: rename a port
name: "rename a port", name: "rename a port",
@ -4987,11 +4894,11 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{{ expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.1:11", Endpoint: "10.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{ expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
}, },
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
@ -5010,12 +4917,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{{ expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.1:11", Endpoint: "10.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
}, { }, {
// Case[13]: complex add and remove // Case[13]: complex add and remove
name: "complex add and remove", name: "complex add and remove",
@ -5058,7 +4965,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false}},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{{ expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.2.2.2:22", Endpoint: "10.2.2.2:22",
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
}, { }, {
@ -5074,7 +4981,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
Endpoint: "10.4.4.6:45", Endpoint: "10.4.4.6:45",
ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{ expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
@ -5093,8 +5000,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{ expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
}, },
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
@ -5136,33 +5043,33 @@ func TestUpdateEndpointsMap(t *testing.T) {
result := fp.endpointsMap.Update(fp.endpointsChanges) result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap newMap := fp.endpointsMap
compareEndpointsMapsExceptChainName(t, tci, newMap, tc.expectedResult) compareEndpointsMapsExceptChainName(t, tci, newMap, tc.expectedResult)
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) { if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) {
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints) t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints)
} }
for _, x := range tc.expectedStaleEndpoints { for _, x := range tc.expectedDeletedUDPEndpoints {
found := false found := false
for _, stale := range result.StaleEndpoints { for _, stale := range result.DeletedUDPEndpoints {
if stale == x { if stale == x {
found = true found = true
break break
} }
} }
if !found { if !found {
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints) t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints)
} }
} }
if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) { if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) {
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames) t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices)
} }
for svcName := range tc.expectedStaleServiceNames { for svcName := range tc.expectedNewlyActiveUDPServices {
found := false found := false
for _, stale := range result.StaleServiceNames { for _, stale := range result.NewlyActiveUDPServices {
if stale == svcName { if stale == svcName {
found = true found = true
} }
} }
if !found { if !found {
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices)
} }
} }
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()

View File

@ -948,13 +948,13 @@ func (proxier *Proxier) syncProxyRules() {
// We need to detect stale connections to UDP Services so we // We need to detect stale connections to UDP Services so we
// can clean dangling conntrack entries that can blackhole traffic. // can clean dangling conntrack entries that can blackhole traffic.
conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
conntrackCleanupServiceNodePorts := sets.NewInt() conntrackCleanupServiceNodePorts := sets.NewInt()
// merge stale services gathered from updateEndpointsMap // merge stale services gathered from updateEndpointsMap
// an UDP service that changes from 0 to non-0 endpoints is considered stale. // an UDP service that changes from 0 to non-0 endpoints is considered stale.
for _, svcPortName := range endpointUpdateResult.StaleServiceNames { for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok {
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() { for _, extIP := range svcInfo.ExternalIPStrings() {
conntrackCleanupServiceIPs.Insert(extIP) conntrackCleanupServiceIPs.Insert(extIP)
@ -964,7 +964,6 @@ func (proxier *Proxier) syncProxyRules() {
} }
nodePort := svcInfo.NodePort() nodePort := svcInfo.NodePort()
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 { if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "nodePort", nodePort)
conntrackCleanupServiceNodePorts.Insert(nodePort) conntrackCleanupServiceNodePorts.Insert(nodePort)
} }
} }
@ -1545,8 +1544,8 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort) klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort)
} }
} }
klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints) klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.DeletedUDPEndpoints)
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) proxier.deleteUDPEndpointConnections(endpointUpdateResult.DeletedUDPEndpoints)
} }
// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed
@ -1813,35 +1812,34 @@ func (proxier *Proxier) createAndLinkKubeChain() {
} }
// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
// risk sending more traffic to it, all of which will be lost. // risk sending more traffic to it, all of which will be lost (because UDP).
// This assumes the proxier mutex is held // This assumes the proxier mutex is held
// TODO: move it to util // TODO: move it to util
func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { func (proxier *Proxier) deleteUDPEndpointConnections(deletedUDPEndpoints []proxy.ServiceEndpoint) {
for _, epSvcPair := range connectionMap { for _, epSvcPair := range deletedUDPEndpoints {
if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok {
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
nodePort := svcInfo.NodePort() nodePort := svcInfo.NodePort()
svcProto := svcInfo.Protocol()
var err error var err error
if nodePort != 0 { if nodePort != 0 {
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto) err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName) klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName)
} }
} }
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName) klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName)
} }
for _, extIP := range svcInfo.ExternalIPStrings() { for _, extIP := range svcInfo.ExternalIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
} }
} }
for _, lbIP := range svcInfo.LoadBalancerIPStrings() { for _, lbIP := range svcInfo.LoadBalancerIPStrings() {
err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)
} }

View File

@ -2543,9 +2543,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
t.Errorf("expected service map length 12, got %v", fp.svcPortMap) t.Errorf("expected service map length 12, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
// Services only added, so nothing stale yet // Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
} }
// The only-local-loadbalancer ones get added // The only-local-loadbalancer ones get added
@ -2581,11 +2581,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
// from the three deleted services here, we still have the ClusterIP for // from the three deleted services here, we still have the ClusterIP for
// the not-deleted service, because one of it's ServicePorts was deleted. // the not-deleted service, because one of it's ServicePorts was deleted.
expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"} expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) { if len(result.DeletedUDPClusterIPs) != len(expectedStaleUDPServices) {
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.List()) t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.DeletedUDPClusterIPs.List())
} }
for _, ip := range expectedStaleUDPServices { for _, ip := range expectedStaleUDPServices {
if !result.UDPStaleClusterIP.Has(ip) { if !result.DeletedUDPClusterIPs.Has(ip) {
t.Errorf("expected stale UDP service service %s", ip) t.Errorf("expected stale UDP service service %s", ip)
} }
} }
@ -2625,8 +2625,8 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) {
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
} }
// No proxied services, so no healthchecks // No proxied services, so no healthchecks
@ -2655,8 +2655,8 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) {
if len(fp.svcPortMap) != 0 { if len(fp.svcPortMap) != 0 {
t.Errorf("expected service map length 0, got %v", fp.svcPortMap) t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs)
} }
// No proxied services, so no healthchecks // No proxied services, so no healthchecks
@ -2699,9 +2699,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if len(fp.svcPortMap) != 2 { if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap) t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
// Services only added, so nothing stale yet // Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
} }
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
@ -2715,8 +2715,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if len(fp.svcPortMap) != 2 { if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap) t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List()) t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.List())
} }
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
@ -2731,8 +2731,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if len(fp.svcPortMap) != 2 { if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap) t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List()) t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.List())
} }
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
@ -2746,9 +2746,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if len(fp.svcPortMap) != 2 { if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap) t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
// Services only added, so nothing stale yet // Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
} }
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
@ -3148,22 +3148,22 @@ func Test_updateEndpointsMap(t *testing.T) {
// previousEndpoints and currentEndpoints are used to call appropriate // previousEndpoints and currentEndpoints are used to call appropriate
// handlers OnEndpoints* (based on whether corresponding values are nil // handlers OnEndpoints* (based on whether corresponding values are nil
// or non-nil) and must be of equal length. // or non-nil) and must be of equal length.
name string name string
previousEndpoints []*discovery.EndpointSlice previousEndpoints []*discovery.EndpointSlice
currentEndpoints []*discovery.EndpointSlice currentEndpoints []*discovery.EndpointSlice
oldEndpoints map[proxy.ServicePortName][]*proxy.BaseEndpointInfo oldEndpoints map[proxy.ServicePortName][]*proxy.BaseEndpointInfo
expectedResult map[proxy.ServicePortName][]*proxy.BaseEndpointInfo expectedResult map[proxy.ServicePortName][]*proxy.BaseEndpointInfo
expectedStaleEndpoints []proxy.ServiceEndpoint expectedDeletedUDPEndpoints []proxy.ServiceEndpoint
expectedStaleServiceNames map[proxy.ServicePortName]bool expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool
expectedReadyEndpoints map[types.NamespacedName]int expectedReadyEndpoints map[types.NamespacedName]int
}{{ }{{
// Case[0]: nothing // Case[0]: nothing
name: "nothing", name: "nothing",
oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{}, oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{}, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{}, expectedReadyEndpoints: map[types.NamespacedName]int{},
}, { }, {
// Case[1]: no change, named port, local // Case[1]: no change, named port, local
name: "no change, named port, local", name: "no change, named port, local",
@ -3179,8 +3179,8 @@ func Test_updateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:11", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{ expectedReadyEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1, makeNSN("ns1", "ep1"): 1,
}, },
@ -3205,9 +3205,9 @@ func Test_updateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{}, expectedReadyEndpoints: map[types.NamespacedName]int{},
}, { }, {
// Case[3]: no change, multiple subsets, multiple ports, local // Case[3]: no change, multiple subsets, multiple ports, local
name: "no change, multiple subsets, multiple ports, local", name: "no change, multiple subsets, multiple ports, local",
@ -3235,8 +3235,8 @@ func Test_updateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{ expectedReadyEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1, makeNSN("ns1", "ep1"): 1,
}, },
@ -3297,8 +3297,8 @@ func Test_updateEndpointsMap(t *testing.T) {
{Endpoint: "2.2.2.2:22", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "2.2.2.2:22", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{ expectedReadyEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 2, makeNSN("ns1", "ep1"): 2,
makeNSN("ns2", "ep2"): 1, makeNSN("ns2", "ep2"): 1,
@ -3314,8 +3314,8 @@ func Test_updateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:11", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{ expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
}, },
expectedReadyEndpoints: map[types.NamespacedName]int{ expectedReadyEndpoints: map[types.NamespacedName]int{
@ -3332,12 +3332,12 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{}, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{},
expectedStaleEndpoints: []proxy.ServiceEndpoint{{ expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.1:11", Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{}, expectedReadyEndpoints: map[types.NamespacedName]int{},
}, { }, {
// Case[7]: add an IP and port // Case[7]: add an IP and port
name: "add an IP and port", name: "add an IP and port",
@ -3358,8 +3358,8 @@ func Test_updateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.2:12", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.2:12", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{ expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
}, },
expectedReadyEndpoints: map[types.NamespacedName]int{ expectedReadyEndpoints: map[types.NamespacedName]int{
@ -3385,7 +3385,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{{ expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.2:11", Endpoint: "1.1.1.2:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}, { }, {
@ -3395,8 +3395,8 @@ func Test_updateEndpointsMap(t *testing.T) {
Endpoint: "1.1.1.2:12", Endpoint: "1.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{}, expectedReadyEndpoints: map[types.NamespacedName]int{},
}, { }, {
// Case[9]: add a subset // Case[9]: add a subset
name: "add a subset", name: "add a subset",
@ -3415,8 +3415,8 @@ func Test_updateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.2:12", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.2:12", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{ expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
}, },
expectedReadyEndpoints: map[types.NamespacedName]int{ expectedReadyEndpoints: map[types.NamespacedName]int{
@ -3440,12 +3440,12 @@ func Test_updateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{{ expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.2:12", Endpoint: "1.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{}, expectedReadyEndpoints: map[types.NamespacedName]int{},
}, { }, {
// Case[11]: rename a port // Case[11]: rename a port
name: "rename a port", name: "rename a port",
@ -3461,11 +3461,11 @@ func Test_updateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{{ expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.1:11", Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{ expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
}, },
expectedReadyEndpoints: map[types.NamespacedName]int{}, expectedReadyEndpoints: map[types.NamespacedName]int{},
@ -3484,12 +3484,12 @@ func Test_updateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{{ expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.1:11", Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{}, expectedReadyEndpoints: map[types.NamespacedName]int{},
}, { }, {
// Case[13]: complex add and remove // Case[13]: complex add and remove
name: "complex add and remove", name: "complex add and remove",
@ -3532,7 +3532,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{Endpoint: "4.4.4.4:44", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "4.4.4.4:44", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{{ expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "2.2.2.2:22", Endpoint: "2.2.2.2:22",
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
}, { }, {
@ -3548,7 +3548,7 @@ func Test_updateEndpointsMap(t *testing.T) {
Endpoint: "4.4.4.6:45", Endpoint: "4.4.4.6:45",
ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP), ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
}}, }},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{ expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
@ -3567,8 +3567,8 @@ func Test_updateEndpointsMap(t *testing.T) {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
}, },
expectedStaleEndpoints: []proxy.ServiceEndpoint{}, expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedStaleServiceNames: map[proxy.ServicePortName]bool{ expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
}, },
expectedReadyEndpoints: map[types.NamespacedName]int{}, expectedReadyEndpoints: map[types.NamespacedName]int{},
@ -3612,34 +3612,34 @@ func Test_updateEndpointsMap(t *testing.T) {
result := fp.endpointsMap.Update(fp.endpointsChanges) result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap newMap := fp.endpointsMap
compareEndpointsMaps(t, tci, newMap, tc.expectedResult) compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) { if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) {
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints) t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints)
} }
for _, x := range tc.expectedStaleEndpoints { for _, x := range tc.expectedDeletedUDPEndpoints {
found := false found := false
for _, stale := range result.StaleEndpoints { for _, stale := range result.DeletedUDPEndpoints {
if stale == x { if stale == x {
found = true found = true
break break
} }
} }
if !found { if !found {
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints) t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints)
} }
} }
if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) { if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) {
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames) t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices)
} }
for svcName := range tc.expectedStaleServiceNames { for svcName := range tc.expectedNewlyActiveUDPServices {
found := false found := false
for _, stale := range result.StaleServiceNames { for _, stale := range result.NewlyActiveUDPServices {
if stale == svcName { if stale == svcName {
found = true found = true
break break
} }
} }
if !found { if !found {
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices)
} }
} }
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()

View File

@ -337,15 +337,16 @@ func (sct *ServiceChangeTracker) PendingChanges() sets.String {
// UpdateServiceMapResult is the updated results after applying service changes. // UpdateServiceMapResult is the updated results after applying service changes.
type UpdateServiceMapResult struct { type UpdateServiceMapResult struct {
// UDPStaleClusterIP holds stale (no longer assigned to a Service) Service IPs that had UDP ports. // DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs
// Callers can use this to abort timeout-waits or clear connection-tracking information. // that had UDP ports. Callers can use this to abort timeout-waits or clear
UDPStaleClusterIP sets.String // connection-tracking information.
DeletedUDPClusterIPs sets.String
} }
// Update updates ServicePortMap base on the given changes. // Update updates ServicePortMap base on the given changes.
func (sm ServicePortMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) { func (sm ServicePortMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
result.UDPStaleClusterIP = sets.NewString() result.DeletedUDPClusterIPs = sets.NewString()
sm.apply(changes, result.UDPStaleClusterIP) sm.apply(changes, result.DeletedUDPClusterIPs)
return result return result
} }
@ -398,10 +399,9 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic
return svcPortMap return svcPortMap
} }
// apply the changes to ServicePortMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the // apply the changes to ServicePortMap and update the deleted UDP cluster IP set.
// udp protocol service cluster ip when service is deleted from the ServicePortMap.
// apply triggers processServiceMapChange on every change. // apply triggers processServiceMapChange on every change.
func (sm *ServicePortMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) { func (sm *ServicePortMap) apply(changes *ServiceChangeTracker, deletedUDPClusterIPs sets.String) {
changes.lock.Lock() changes.lock.Lock()
defer changes.lock.Unlock() defer changes.lock.Unlock()
for _, change := range changes.items { for _, change := range changes.items {
@ -412,7 +412,7 @@ func (sm *ServicePortMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP
// filter out the Update event of current changes from previous changes before calling unmerge() so that can // filter out the Update event of current changes from previous changes before calling unmerge() so that can
// skip deleting the Update events. // skip deleting the Update events.
change.previous.filter(change.current) change.previous.filter(change.current)
sm.unmerge(change.previous, UDPStaleClusterIP) sm.unmerge(change.previous, deletedUDPClusterIPs)
} }
// clear changes after applying them to ServicePortMap. // clear changes after applying them to ServicePortMap.
changes.items = make(map[types.NamespacedName]*serviceChange) changes.items = make(map[types.NamespacedName]*serviceChange)
@ -467,15 +467,15 @@ func (sm *ServicePortMap) filter(other ServicePortMap) {
} }
} }
// unmerge deletes all other ServicePortMap's elements from current ServicePortMap. We pass in the UDPStaleClusterIP strings sets // unmerge deletes all other ServicePortMap's elements from current ServicePortMap and
// for storing the stale udp service cluster IPs. We will clear stale udp connection base on UDPStaleClusterIP later // updates deletedUDPClusterIPs with all of the newly-deleted UDP cluster IPs.
func (sm *ServicePortMap) unmerge(other ServicePortMap, UDPStaleClusterIP sets.String) { func (sm *ServicePortMap) unmerge(other ServicePortMap, deletedUDPClusterIPs sets.String) {
for svcPortName := range other { for svcPortName := range other {
info, exists := (*sm)[svcPortName] info, exists := (*sm)[svcPortName]
if exists { if exists {
klog.V(4).InfoS("Removing service port", "portName", svcPortName) klog.V(4).InfoS("Removing service port", "portName", svcPortName)
if info.Protocol() == v1.ProtocolUDP { if info.Protocol() == v1.ProtocolUDP {
UDPStaleClusterIP.Insert(info.ClusterIP().String()) deletedUDPClusterIPs.Insert(info.ClusterIP().String())
} }
delete(*sm, svcPortName) delete(*sm, svcPortName)
} else { } else {

View File

@ -568,8 +568,9 @@ func TestServiceMapUpdateHeadless(t *testing.T) {
if len(fp.svcPortMap) != 0 { if len(fp.svcPortMap) != 0 {
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
} }
if len(result.UDPStaleClusterIP) != 0 {
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
} }
// No proxied services, so no healthchecks // No proxied services, so no healthchecks
@ -599,8 +600,8 @@ func TestUpdateServiceTypeExternalName(t *testing.T) {
if len(fp.svcPortMap) != 0 { if len(fp.svcPortMap) != 0 {
t.Errorf("expected service map length 0, got %v", fp.svcPortMap) t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs)
} }
// No proxied services, so no healthchecks // No proxied services, so no healthchecks
@ -673,9 +674,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
if len(fp.svcPortMap) != 8 { if len(fp.svcPortMap) != 8 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap) t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
// Services only added, so nothing stale yet // Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
} }
// The only-local-loadbalancer ones get added // The only-local-loadbalancer ones get added
@ -719,12 +720,12 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
// All services but one were deleted. While you'd expect only the ClusterIPs // All services but one were deleted. While you'd expect only the ClusterIPs
// from the three deleted services here, we still have the ClusterIP for // from the three deleted services here, we still have the ClusterIP for
// the not-deleted service, because one of it's ServicePorts was deleted. // the not-deleted service, because one of it's ServicePorts was deleted.
expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"} expectedDeletedUDPClusterIPs := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"}
if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) { if len(result.DeletedUDPClusterIPs) != len(expectedDeletedUDPClusterIPs) {
t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList()) t.Errorf("expected stale UDP services length %d, got %v", len(expectedDeletedUDPClusterIPs), result.DeletedUDPClusterIPs.UnsortedList())
} }
for _, ip := range expectedStaleUDPServices { for _, ip := range expectedDeletedUDPClusterIPs {
if !result.UDPStaleClusterIP.Has(ip) { if !result.DeletedUDPClusterIPs.Has(ip) {
t.Errorf("expected stale UDP service service %s", ip) t.Errorf("expected stale UDP service service %s", ip)
} }
} }
@ -764,9 +765,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if len(fp.svcPortMap) != 2 { if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap) t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
// Services only added, so nothing stale yet // Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
} }
healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts()
@ -784,8 +785,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if len(fp.svcPortMap) != 2 { if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap) t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
} }
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
@ -804,8 +805,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if len(fp.svcPortMap) != 2 { if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap) t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
} }
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
@ -823,9 +824,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
if len(fp.svcPortMap) != 2 { if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap) t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
} }
if len(result.UDPStaleClusterIP) != 0 { if len(result.DeletedUDPClusterIPs) != 0 {
// Services only added, so nothing stale yet // Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
} }
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()

View File

@ -1202,12 +1202,12 @@ func (proxier *Proxier) syncProxyRules() {
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
staleServices := serviceUpdateResult.UDPStaleClusterIP deletedUDPClusterIPs := serviceUpdateResult.DeletedUDPClusterIPs
// merge stale services gathered from updateEndpointsMap // merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames { for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).InfoS("Stale udp service", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) klog.V(2).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
staleServices.Insert(svcInfo.ClusterIP().String()) deletedUDPClusterIPs.Insert(svcInfo.ClusterIP().String())
} }
} }
// Query HNS for endpoints and load balancers // Query HNS for endpoints and load balancers
@ -1653,7 +1653,7 @@ func (proxier *Proxier) syncProxyRules() {
// Finish housekeeping. // Finish housekeeping.
// TODO: these could be made more consistent. // TODO: these could be made more consistent.
for _, svcIP := range staleServices.UnsortedList() { for _, svcIP := range deletedUDPClusterIPs.UnsortedList() {
// TODO : Check if this is required to cleanup stale services here // TODO : Check if this is required to cleanup stale services here
klog.V(5).InfoS("Pending delete stale service IP connections", "IP", svcIP) klog.V(5).InfoS("Pending delete stale service IP connections", "IP", svcIP)
} }

View File

@ -100,7 +100,7 @@ func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1.
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
// is expensive to baby sit all udp connections to kubernetes services. // is expensive to baby sit all udp connections to kubernetes services.
return fmt.Errorf("error deleting conntrack entries for %s peer {%s, %s}, error: %v", protoStr(protocol), origin, dest, err) return fmt.Errorf("error deleting conntrack entries for UDP peer {%s, %s}, error: %v", origin, dest, err)
} }
return nil return nil
} }
@ -116,12 +116,7 @@ func ClearEntriesForPortNAT(execer exec.Interface, dest string, port int, protoc
parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest) parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest)
err := Exec(execer, parameters...) err := Exec(execer, parameters...)
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
return fmt.Errorf("error deleting conntrack entries for %s port: %d, error: %v", protoStr(protocol), port, err) return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
} }
return nil return nil
} }
// IsClearConntrackNeeded returns true if protocol requires conntrack cleanup for the stale connections
func IsClearConntrackNeeded(proto v1.Protocol) bool {
return proto == v1.ProtocolUDP || proto == v1.ProtocolSCTP
}

View File

@ -177,7 +177,7 @@ func TestClearUDPConntrackForPort(t *testing.T) {
} }
} }
func TestDeleteConnections(t *testing.T) { func TestDeleteUDPConnections(t *testing.T) {
fcmd := fakeexec.FakeCmd{ fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeAction{ CombinedOutputScript: []fakeexec.FakeAction{
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
@ -185,11 +185,6 @@ func TestDeleteConnections(t *testing.T) {
return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
}, },
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
func() ([]byte, []byte, error) {
return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
},
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
}, },
} }
fexec := &fakeexec.FakeExec{ fexec := &fakeexec.FakeExec{
@ -197,9 +192,6 @@ func TestDeleteConnections(t *testing.T) {
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
}, },
LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
} }
@ -208,52 +200,30 @@ func TestDeleteConnections(t *testing.T) {
name string name string
origin string origin string
dest string dest string
proto v1.Protocol
}{ }{
{ {
name: "UDP IPv4 success", name: "IPv4 success",
origin: "1.2.3.4", origin: "1.2.3.4",
dest: "10.20.30.40", dest: "10.20.30.40",
proto: v1.ProtocolUDP,
}, },
{ {
name: "UDP IPv4 simulated failure", name: "IPv4 simulated failure",
origin: "2.3.4.5", origin: "2.3.4.5",
dest: "20.30.40.50", dest: "20.30.40.50",
proto: v1.ProtocolUDP,
}, },
{ {
name: "UDP IPv6 success", name: "IPv6 success",
origin: "fd00::600d:f00d", origin: "fd00::600d:f00d",
dest: "2001:db8::5", dest: "2001:db8::5",
proto: v1.ProtocolUDP,
},
{
name: "SCTP IPv4 success",
origin: "1.2.3.5",
dest: "10.20.30.50",
proto: v1.ProtocolSCTP,
},
{
name: "SCTP IPv4 simulated failure",
origin: "2.3.4.6",
dest: "20.30.40.60",
proto: v1.ProtocolSCTP,
},
{
name: "SCTP IPv6 success",
origin: "fd00::600d:f00d",
dest: "2001:db8::6",
proto: v1.ProtocolSCTP,
}, },
} }
svcCount := 0 svcCount := 0
for i, tc := range testCases { for i, tc := range testCases {
err := ClearEntriesForNAT(fexec, tc.origin, tc.dest, tc.proto) err := ClearEntriesForNAT(fexec, tc.origin, tc.dest, v1.ProtocolUDP)
if err != nil { if err != nil {
t.Errorf("%s test case: unexpected error: %v", tc.name, err) t.Errorf("%s test case: unexpected error: %v", tc.name, err)
} }
expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.origin, tc.dest, protoStr(tc.proto)) + familyParamStr(utilnet.IsIPv6String(tc.origin)) expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.origin, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.origin))
execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ")
if expectCommand != execCommand { if expectCommand != execCommand {
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
@ -265,46 +235,36 @@ func TestDeleteConnections(t *testing.T) {
} }
} }
func TestClearConntrackForPortNAT(t *testing.T) { func TestClearUDPConntrackForPortNAT(t *testing.T) {
fcmd := fakeexec.FakeCmd{ fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeAction{ CombinedOutputScript: []fakeexec.FakeAction{
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil },
}, },
} }
fexec := &fakeexec.FakeExec{ fexec := &fakeexec.FakeExec{
CommandScript: []fakeexec.FakeCommandAction{ CommandScript: []fakeexec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
}, },
LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
} }
testCases := []struct { testCases := []struct {
name string name string
port int port int
dest string dest string
proto v1.Protocol
}{ }{
{ {
name: "UDP IPv4 success", name: "IPv4 success",
port: 30211, port: 30211,
dest: "1.2.3.4", dest: "1.2.3.4",
proto: v1.ProtocolUDP,
},
{
name: "SCTP IPv4 success",
port: 30215,
dest: "1.2.3.5",
proto: v1.ProtocolSCTP,
}, },
} }
svcCount := 0 svcCount := 0
for i, tc := range testCases { for i, tc := range testCases {
err := ClearEntriesForPortNAT(fexec, tc.dest, tc.port, tc.proto) err := ClearEntriesForPortNAT(fexec, tc.dest, tc.port, v1.ProtocolUDP)
if err != nil { if err != nil {
t.Errorf("%s test case: unexpected error: %v", tc.name, err) t.Errorf("%s test case: unexpected error: %v", tc.name, err)
} }
expectCommand := fmt.Sprintf("conntrack -D -p %s --dport %d --dst-nat %s", protoStr(tc.proto), tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest)) expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d --dst-nat %s", tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest))
execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ")
if expectCommand != execCommand { if expectCommand != execCommand {
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)