diff --git a/pkg/proxy/endpointschangetracker.go b/pkg/proxy/endpointschangetracker.go index 1ce3172561a..3918aae24c8 100644 --- a/pkg/proxy/endpointschangetracker.go +++ b/pkg/proxy/endpointschangetracker.go @@ -178,18 +178,9 @@ type UpdateEndpointsMapResult struct { // UpdatedServices lists the names of all services with added/updated/deleted // endpoints since the last Update. UpdatedServices sets.Set[types.NamespacedName] - - // DeletedUDPEndpoints identifies UDP endpoints that have just been deleted. - // Existing conntrack NAT entries pointing to these endpoints must be deleted to - // ensure that no further traffic for the Service gets delivered to them. - DeletedUDPEndpoints []ServiceEndpoint - - // NewlyActiveUDPServices identifies UDP Services that have just gone from 0 to - // non-0 endpoints. Existing conntrack entries caching the fact that these - // services are black holes must be deleted to ensure that traffic can immediately - // begin flowing to the new endpoints. - NewlyActiveUDPServices []ServicePortName - + // ConntrackCleanupRequired will be true if any UDP ServicePort changed endpoints, false otherwise. + // It's used to minimise conntrack cleanup calls. + ConntrackCleanupRequired bool // List of the trigger times for all endpoints objects that changed. It's used to export the // network programming latency. // NOTE(oxddr): this can be simplified to []time.Time if memory consumption becomes an issue. @@ -205,8 +196,6 @@ type EndpointsMap map[ServicePortName][]Endpoint func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapResult { result := UpdateEndpointsMapResult{ UpdatedServices: sets.New[types.NamespacedName](), - DeletedUDPEndpoints: make([]ServiceEndpoint, 0), - NewlyActiveUDPServices: make([]ServicePortName, 0), LastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), } if ect == nil { @@ -222,7 +211,26 @@ func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapRes em.unmerge(change.previous) em.merge(change.current) - detectStaleConntrackEntries(change.previous, change.current, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices) + + // result.ConntrackCleanupRequired should be true if any one of the UDP + // ServicePort changed endpoint. Once true, we don't update the value. + if result.ConntrackCleanupRequired { + continue + } + // Check if the changed service had any UDP ServicePort + for svcPort := range change.previous { + if svcPort.NamespacedName == nn && svcPort.Protocol == v1.ProtocolUDP { + result.ConntrackCleanupRequired = true + break + } + } + // Check if the changed service has any UDP ServicePort + for svcPort := range change.current { + if svcPort.NamespacedName == nn && svcPort.Protocol == v1.ProtocolUDP { + result.ConntrackCleanupRequired = true + break + } + } } ect.checkoutTriggerTimes(&result.LastChangeTriggerTimes) @@ -284,68 +292,3 @@ func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int { } return eps } - -// detectStaleConntrackEntries detects services that may be associated with stale conntrack entries. -// (See UpdateEndpointsMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.) -func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName) { - // Find the UDP endpoints that we were sending traffic to in oldEndpointsMap, but - // are no longer sending to newEndpointsMap. The proxier should make sure that - // conntrack does not accidentally route any new connections to them. - for svcPortName, epList := range oldEndpointsMap { - if svcPortName.Protocol != v1.ProtocolUDP { - continue - } - - for _, ep := range epList { - // If the old endpoint wasn't Serving then there can't be stale - // conntrack entries since there was no traffic sent to it. - if !ep.IsServing() { - continue - } - - deleted := true - // Check if the endpoint has changed, including if it went from - // serving to not serving. If it did change stale entries for the old - // endpoint have to be cleared. - for i := range newEndpointsMap[svcPortName] { - if newEndpointsMap[svcPortName][i].String() == ep.String() && - newEndpointsMap[svcPortName][i].IsServing() == ep.IsServing() { - deleted = false - break - } - } - if deleted { - klog.V(4).InfoS("Deleted endpoint may have stale conntrack entries", "portName", svcPortName, "endpoint", ep) - *deletedUDPEndpoints = append(*deletedUDPEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName}) - } - } - } - - // Detect services that have gone from 0 to non-0 ready endpoints. If there were - // previously 0 endpoints, but someone tried to connect to it, then a conntrack - // entry may have been created blackholing traffic to that IP, which should be - // deleted now. - for svcPortName, epList := range newEndpointsMap { - if svcPortName.Protocol != v1.ProtocolUDP { - continue - } - - epServing := 0 - for _, ep := range epList { - if ep.IsServing() { - epServing++ - } - } - - oldEpServing := 0 - for _, ep := range oldEndpointsMap[svcPortName] { - if ep.IsServing() { - oldEpServing++ - } - } - - if epServing > 0 && oldEpServing == 0 { - *newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName) - } - } -} diff --git a/pkg/proxy/endpointschangetracker_test.go b/pkg/proxy/endpointschangetracker_test.go index dd3ddb0fd68..89940b2912d 100644 --- a/pkg/proxy/endpointschangetracker_test.go +++ b/pkg/proxy/endpointschangetracker_test.go @@ -525,23 +525,21 @@ func TestUpdateEndpointsMap(t *testing.T) { // previousEndpointSlices and currentEndpointSlices are used to call appropriate // handlers OnEndpointSlice* (based on whether corresponding values are nil // or non-nil) and must be of equal length. - name string - previousEndpointSlices []*discovery.EndpointSlice - currentEndpointSlices []*discovery.EndpointSlice - previousEndpointsMap map[ServicePortName][]*BaseEndpointInfo - expectedResult map[ServicePortName][]*BaseEndpointInfo - expectedDeletedUDPEndpoints []ServiceEndpoint - expectedNewlyActiveUDPServices map[ServicePortName]bool - expectedLocalEndpoints map[types.NamespacedName]int - expectedChangedEndpoints sets.Set[types.NamespacedName] + name string + previousEndpointSlices []*discovery.EndpointSlice + currentEndpointSlices []*discovery.EndpointSlice + previousEndpointsMap map[ServicePortName][]*BaseEndpointInfo + expectedResult map[ServicePortName][]*BaseEndpointInfo + expectedConntrackCleanupRequired bool + expectedLocalEndpoints map[types.NamespacedName]int + expectedChangedEndpoints sets.Set[types.NamespacedName] }{{ - name: "empty", - previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{}, - expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, - expectedDeletedUDPEndpoints: []ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[types.NamespacedName](), + name: "empty", + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{}, + expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, + expectedConntrackCleanupRequired: false, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New[types.NamespacedName](), }, { name: "no change, unnamed port", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -560,10 +558,9 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[types.NamespacedName](), + expectedConntrackCleanupRequired: false, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New[types.NamespacedName](), }, { name: "no change, named port, local", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -582,8 +579,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: true, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedConntrackCleanupRequired: false, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -614,10 +610,9 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.2", port: 12, endpoint: "1.1.1.2:12", isLocal: false, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[types.NamespacedName](), + expectedConntrackCleanupRequired: false, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New[types.NamespacedName](), }, { name: "no change, multiple slices, multiple ports, local", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -650,8 +645,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.3", port: 13, endpoint: "1.1.1.3:13", isLocal: false, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedConntrackCleanupRequired: false, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -720,8 +714,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "2.2.2.2", port: 22, endpoint: "2.2.2.2:22", isLocal: true, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedConntrackCleanupRequired: false, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, @@ -741,10 +734,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: true, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -762,14 +752,10 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: true, ready: true, serving: true, terminating: false}, }, }, - expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, - expectedDeletedUDPEndpoints: []ServiceEndpoint{{ - Endpoint: "1.1.1.1:11", - ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), + expectedConntrackCleanupRequired: true, + expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "add an IP and port", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -793,10 +779,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.2", port: 12, endpoint: "1.1.1.2:12", isLocal: true, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -824,19 +807,9 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{{ - Endpoint: "1.1.1.2:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }, { - Endpoint: "1.1.1.1:12", - ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), - }, { - Endpoint: "1.1.1.2:12", - ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "add a slice to an endpoint", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -860,10 +833,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.2", port: 12, endpoint: "1.1.1.2:12", isLocal: true, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -891,13 +861,9 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{{ - Endpoint: "1.1.1.2:12", - ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "rename a port", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -916,15 +882,9 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{{ - Endpoint: "1.1.1.1:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, - }, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "renumber a port", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -943,13 +903,9 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.1", port: 22, endpoint: "1.1.1.1:22", isLocal: false, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{{ - Endpoint: "1.1.1.1:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "complex add and remove", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -1015,27 +971,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "4.4.4.4", port: 44, endpoint: "4.4.4.4:44", isLocal: true, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{{ - Endpoint: "2.2.2.2:22", - ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), - }, { - Endpoint: "2.2.2.22:22", - ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), - }, { - Endpoint: "2.2.2.3:23", - ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP), - }, { - Endpoint: "4.4.4.5:44", - ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP), - }, { - Endpoint: "4.4.4.6:45", - ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, - makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, - makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, @@ -1054,12 +990,9 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: true, serving: true, terminating: false}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true, - }, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "change from ready to terminating pod", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -1078,10 +1011,9 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: false, serving: true, terminating: true}, }, }, - expectedDeletedUDPEndpoints: []ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "change from terminating to empty pod", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -1095,14 +1027,10 @@ func TestUpdateEndpointsMap(t *testing.T) { {ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: false, serving: true, terminating: true}, }, }, - expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, - expectedDeletedUDPEndpoints: []ServiceEndpoint{{ - Endpoint: "1.1.1.1:11", - ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), + expectedConntrackCleanupRequired: true, + expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, } @@ -1147,36 +1075,9 @@ func TestUpdateEndpointsMap(t *testing.T) { if !result.UpdatedServices.Equal(tc.expectedChangedEndpoints) { t.Errorf("[%d] expected changed endpoints %q, got %q", tci, tc.expectedChangedEndpoints.UnsortedList(), result.UpdatedServices.UnsortedList()) } - if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) { - t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints) + if result.ConntrackCleanupRequired != tc.expectedConntrackCleanupRequired { + t.Errorf("[%d] expected conntrackCleanupRequired to be %t, got %t", tci, tc.expectedConntrackCleanupRequired, result.ConntrackCleanupRequired) } - for _, x := range tc.expectedDeletedUDPEndpoints { - found := false - for _, stale := range result.DeletedUDPEndpoints { - if stale == x { - found = true - break - } - } - if !found { - t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints) - } - } - if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) { - t.Errorf("[%d] expected %d newlyActiveUDPServices, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices) - } - for svcName := range tc.expectedNewlyActiveUDPServices { - found := false - for _, newSvcName := range result.NewlyActiveUDPServices { - if newSvcName == svcName { - found = true - } - } - if !found { - t.Errorf("[%d] expected newlyActiveUDPServices[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices) - } - } - localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) { t.Errorf("[%d] expected local ready endpoints %v, got %v", tci, tc.expectedLocalEndpoints, localReadyEndpoints) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index eb174c86f95..086c4f30bb4 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -1594,8 +1594,10 @@ func (proxier *Proxier) syncProxyRules() { proxier.logger.Error(err, "Error syncing healthcheck endpoints") } - // Finish housekeeping, clear stale conntrack entries for UDP Services - conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap) + if endpointUpdateResult.ConntrackCleanupRequired { + // Finish housekeeping, clear stale conntrack entries for UDP Services + conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap) + } } func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index a9d3a9e2269..939e25ae940 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -3564,22 +3564,20 @@ func TestUpdateEndpointsMap(t *testing.T) { // previousEndpoints and currentEndpoints are used to call appropriate // handlers OnEndpoints* (based on whether corresponding values are nil // or non-nil) and must be of equal length. - name string - previousEndpoints []*discovery.EndpointSlice - currentEndpoints []*discovery.EndpointSlice - oldEndpoints map[proxy.ServicePortName][]endpointExpectation - expectedResult map[proxy.ServicePortName][]endpointExpectation - expectedDeletedUDPEndpoints []proxy.ServiceEndpoint - expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool - expectedLocalEndpoints map[types.NamespacedName]int + name string + previousEndpoints []*discovery.EndpointSlice + currentEndpoints []*discovery.EndpointSlice + oldEndpoints map[proxy.ServicePortName][]endpointExpectation + expectedResult map[proxy.ServicePortName][]endpointExpectation + expectedConntrackCleanupRequired bool + expectedLocalEndpoints map[types.NamespacedName]int }{{ // Case[0]: nothing - name: "nothing", - oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{}, - expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + name: "nothing", + oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{}, + expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, + expectedConntrackCleanupRequired: false, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[1]: no change, named port, local name: "no change, named port, local", @@ -3595,8 +3593,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedConntrackCleanupRequired: false, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -3621,9 +3618,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.2:12", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: false, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[3]: no change, multiple subsets, multiple ports, local name: "no change, multiple subsets, multiple ports, local", @@ -3651,8 +3647,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.3:13", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedConntrackCleanupRequired: false, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -3713,8 +3708,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.2.2.2:22", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedConntrackCleanupRequired: false, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, @@ -3730,10 +3724,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -3747,13 +3738,9 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: true}, }, }, - expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "10.1.1.1:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[7]: add an IP and port name: "add an IP and port", @@ -3774,10 +3761,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.2:12", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -3801,18 +3785,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "10.1.1.2:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }, { - Endpoint: "10.1.1.1:12", - ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), - }, { - Endpoint: "10.1.1.2:12", - ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[9]: add a subset name: "add a subset", @@ -3831,10 +3805,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.2:12", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -3856,12 +3827,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "10.1.1.2:12", - ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[11]: rename a port name: "rename a port", @@ -3877,14 +3844,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "10.1.1.1:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, - }, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[12]: renumber a port name: "renumber a port", @@ -3900,12 +3861,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:22", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "10.1.1.1:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[13]: complex add and remove name: "complex add and remove", @@ -3948,27 +3905,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.4.4.4:44", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "10.2.2.2:22", - ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), - }, { - Endpoint: "10.2.2.22:22", - ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), - }, { - Endpoint: "10.2.2.3:23", - ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP), - }, { - Endpoint: "10.4.4.5:44", - ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP), - }, { - Endpoint: "10.4.4.6:45", - ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, - makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, - makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, @@ -3983,11 +3920,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, - }, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, } @@ -4026,34 +3960,8 @@ func TestUpdateEndpointsMap(t *testing.T) { result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap checkEndpointExpectations(t, tci, newMap, tc.expectedResult) - if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) { - t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints) - } - for _, x := range tc.expectedDeletedUDPEndpoints { - found := false - for _, stale := range result.DeletedUDPEndpoints { - if stale == x { - found = true - break - } - } - if !found { - t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints) - } - } - if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) { - t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices) - } - for svcName := range tc.expectedNewlyActiveUDPServices { - found := false - for _, stale := range result.NewlyActiveUDPServices { - if stale == svcName { - found = true - } - } - if !found { - t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices) - } + if result.ConntrackCleanupRequired != tc.expectedConntrackCleanupRequired { + t.Errorf("[%d] expected conntrackCleanupRequired to be %t, got %t", tci, tc.expectedConntrackCleanupRequired, result.ConntrackCleanupRequired) } localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) { diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 3b1f6fe96b9..f7c220e2def 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1497,8 +1497,10 @@ func (proxier *Proxier) syncProxyRules() { metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len())) metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len())) - // Finish housekeeping, clear stale conntrack entries for UDP Services - conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap) + if endpointUpdateResult.ConntrackCleanupRequired { + // Finish housekeeping, clear stale conntrack entries for UDP Services + conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap) + } } // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index cf6eb6451e7..5ad9bda5803 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -3127,22 +3127,20 @@ func Test_updateEndpointsMap(t *testing.T) { // previousEndpoints and currentEndpoints are used to call appropriate // handlers OnEndpoints* (based on whether corresponding values are nil // or non-nil) and must be of equal length. - name string - previousEndpoints []*discovery.EndpointSlice - currentEndpoints []*discovery.EndpointSlice - oldEndpoints map[proxy.ServicePortName][]endpointExpectation - expectedResult map[proxy.ServicePortName][]endpointExpectation - expectedDeletedUDPEndpoints []proxy.ServiceEndpoint - expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool - expectedReadyEndpoints map[types.NamespacedName]int + name string + previousEndpoints []*discovery.EndpointSlice + currentEndpoints []*discovery.EndpointSlice + oldEndpoints map[proxy.ServicePortName][]endpointExpectation + expectedResult map[proxy.ServicePortName][]endpointExpectation + expectedConntrackCleanupRequired bool + expectedReadyEndpoints map[types.NamespacedName]int }{{ // Case[0]: nothing - name: "nothing", - oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{}, - expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + name: "nothing", + oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{}, + expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, + expectedConntrackCleanupRequired: false, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[1]: no change, named port, local name: "no change, named port, local", @@ -3158,8 +3156,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedConntrackCleanupRequired: false, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -3184,9 +3181,8 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.2:12", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: false, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[3]: no change, multiple subsets, multiple ports, local name: "no change, multiple subsets, multiple ports, local", @@ -3214,8 +3210,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.3:13", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedConntrackCleanupRequired: false, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -3276,8 +3271,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "2.2.2.2:22", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedConntrackCleanupRequired: false, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, @@ -3293,10 +3287,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -3310,13 +3301,9 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: true}, }, }, - expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "1.1.1.1:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[7]: add an IP and port name: "add an IP and port", @@ -3337,10 +3324,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.2:12", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -3364,18 +3348,8 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "1.1.1.2:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }, { - Endpoint: "1.1.1.1:12", - ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), - }, { - Endpoint: "1.1.1.2:12", - ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[9]: add a subset name: "add a subset", @@ -3394,10 +3368,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.2:12", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -3419,12 +3390,8 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "1.1.1.2:12", - ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[11]: rename a port name: "rename a port", @@ -3440,14 +3407,8 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "1.1.1.1:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, - }, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[12]: renumber a port name: "renumber a port", @@ -3463,12 +3424,8 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:22", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "1.1.1.1:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[13]: complex add and remove name: "complex add and remove", @@ -3511,27 +3468,7 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "4.4.4.4:44", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "2.2.2.2:22", - ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), - }, { - Endpoint: "2.2.2.22:22", - ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), - }, { - Endpoint: "2.2.2.3:23", - ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP), - }, { - Endpoint: "4.4.4.5:44", - ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP), - }, { - Endpoint: "4.4.4.6:45", - ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, - makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, - makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, @@ -3546,11 +3483,8 @@ func Test_updateEndpointsMap(t *testing.T) { {endpoint: "1.1.1.1:11", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, - }, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, } @@ -3592,35 +3526,8 @@ func Test_updateEndpointsMap(t *testing.T) { result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap checkEndpointExpectations(t, tci, newMap, tc.expectedResult) - if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) { - t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints) - } - for _, x := range tc.expectedDeletedUDPEndpoints { - found := false - for _, stale := range result.DeletedUDPEndpoints { - if stale == x { - found = true - break - } - } - if !found { - t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints) - } - } - if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) { - t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices) - } - for svcName := range tc.expectedNewlyActiveUDPServices { - found := false - for _, stale := range result.NewlyActiveUDPServices { - if stale == svcName { - found = true - break - } - } - if !found { - t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices) - } + if result.ConntrackCleanupRequired != tc.expectedConntrackCleanupRequired { + t.Errorf("[%d] expected conntrackCleanupRequired to be %t, got %t", tci, tc.expectedConntrackCleanupRequired, result.ConntrackCleanupRequired) } localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() if !reflect.DeepEqual(localReadyEndpoints, tc.expectedReadyEndpoints) { diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index 257517e5478..538869cb061 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -1838,8 +1838,10 @@ func (proxier *Proxier) syncProxyRules() { proxier.logger.Error(err, "Error syncing healthcheck endpoints") } - // Finish housekeeping, clear stale conntrack entries for UDP Services - conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap) + if endpointUpdateResult.ConntrackCleanupRequired { + // Finish housekeeping, clear stale conntrack entries for UDP Services + conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap) + } } func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) { diff --git a/pkg/proxy/nftables/proxier_test.go b/pkg/proxy/nftables/proxier_test.go index 379070e871e..811b2010c45 100644 --- a/pkg/proxy/nftables/proxier_test.go +++ b/pkg/proxy/nftables/proxier_test.go @@ -1969,22 +1969,20 @@ func TestUpdateEndpointsMap(t *testing.T) { // previousEndpoints and currentEndpoints are used to call appropriate // handlers OnEndpoints* (based on whether corresponding values are nil // or non-nil) and must be of equal length. - name string - previousEndpoints []*discovery.EndpointSlice - currentEndpoints []*discovery.EndpointSlice - oldEndpoints map[proxy.ServicePortName][]endpointExpectation - expectedResult map[proxy.ServicePortName][]endpointExpectation - expectedDeletedUDPEndpoints []proxy.ServiceEndpoint - expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool - expectedLocalEndpoints map[types.NamespacedName]int + name string + previousEndpoints []*discovery.EndpointSlice + currentEndpoints []*discovery.EndpointSlice + oldEndpoints map[proxy.ServicePortName][]endpointExpectation + expectedResult map[proxy.ServicePortName][]endpointExpectation + expectedConntrackCleanupRequired bool + expectedLocalEndpoints map[types.NamespacedName]int }{{ // Case[0]: nothing - name: "nothing", - oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{}, - expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + name: "nothing", + oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{}, + expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, + expectedConntrackCleanupRequired: false, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[1]: no change, named port, local name: "no change, named port, local", @@ -2000,8 +1998,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedConntrackCleanupRequired: false, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -2026,9 +2023,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.2:12", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: false, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[3]: no change, multiple subsets, multiple ports, local name: "no change, multiple subsets, multiple ports, local", @@ -2056,8 +2052,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.3:13", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedConntrackCleanupRequired: false, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -2118,8 +2113,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.2.2.2:22", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedConntrackCleanupRequired: false, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, @@ -2135,10 +2129,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -2152,13 +2143,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: true}, }, }, - expectedResult: map[proxy.ServicePortName][]endpointExpectation{}, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "10.1.1.1:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[7]: add an IP and port name: "add an IP and port", @@ -2179,10 +2165,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.2:12", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -2206,18 +2189,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "10.1.1.2:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }, { - Endpoint: "10.1.1.1:12", - ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), - }, { - Endpoint: "10.1.1.2:12", - ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[9]: add a subset name: "add a subset", @@ -2236,10 +2209,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.2:12", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -2261,12 +2231,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "10.1.1.2:12", - ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[11]: rename a port name: "rename a port", @@ -2282,14 +2248,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "10.1.1.1:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, - }, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[12]: renumber a port name: "renumber a port", @@ -2305,12 +2265,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:22", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "10.1.1.1:11", - ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[13]: complex add and remove name: "complex add and remove", @@ -2353,27 +2309,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.4.4.4:44", isLocal: true}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ - Endpoint: "10.2.2.2:22", - ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), - }, { - Endpoint: "10.2.2.22:22", - ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), - }, { - Endpoint: "10.2.2.3:23", - ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP), - }, { - Endpoint: "10.4.4.5:44", - ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP), - }, { - Endpoint: "10.4.4.6:45", - ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP), - }}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, - makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, - makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, - }, + expectedConntrackCleanupRequired: true, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, @@ -2388,11 +2324,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {endpoint: "10.1.1.1:11", isLocal: false}, }, }, - expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, - expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ - makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, - }, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedConntrackCleanupRequired: true, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, } @@ -2430,34 +2363,8 @@ func TestUpdateEndpointsMap(t *testing.T) { result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap checkEndpointExpectations(t, tci, newMap, tc.expectedResult) - if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) { - t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints) - } - for _, x := range tc.expectedDeletedUDPEndpoints { - found := false - for _, stale := range result.DeletedUDPEndpoints { - if stale == x { - found = true - break - } - } - if !found { - t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints) - } - } - if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) { - t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices) - } - for svcName := range tc.expectedNewlyActiveUDPServices { - found := false - for _, stale := range result.NewlyActiveUDPServices { - if stale == svcName { - found = true - } - } - if !found { - t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices) - } + if result.ConntrackCleanupRequired != tc.expectedConntrackCleanupRequired { + t.Errorf("[%d] expected conntrackCleanupRequired to be %t, got %t", tci, tc.expectedConntrackCleanupRequired, result.ConntrackCleanupRequired) } localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) { diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index aa456ab00cd..92a8969c98f 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -1166,17 +1166,9 @@ func (proxier *Proxier) syncProxyRules() { // We assume that if this was called, we really want to sync them, // even if nothing changed in the meantime. In other words, callers are // responsible for detecting no-op changes and not calling this function. - serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) - endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) + _ = proxier.svcPortMap.Update(proxier.serviceChanges) + _ = proxier.endpointsMap.Update(proxier.endpointsChanges) - deletedUDPClusterIPs := serviceUpdateResult.DeletedUDPClusterIPs - // merge stale services gathered from EndpointsMap.Update - for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices { - if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { - klog.V(2).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName) - deletedUDPClusterIPs.Insert(svcInfo.ClusterIP().String()) - } - } // Query HNS for endpoints and load balancers queriedEndpoints, err := hns.getAllEndpointsByNetwork(hnsNetworkName) if err != nil { @@ -1715,13 +1707,6 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "Error syncing healthcheck endpoints") } - // Finish housekeeping. - // TODO: these could be made more consistent. - for _, svcIP := range deletedUDPClusterIPs.UnsortedList() { - // TODO : Check if this is required to cleanup stale services here - klog.V(5).InfoS("Pending delete stale service IP connections", "IP", svcIP) - } - // remove stale endpoint refcount entries for epIP := range proxier.terminatedEndpoints { if epToDelete := queriedEndpoints[epIP]; epToDelete != nil && epToDelete.hnsID != "" {