diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 6597f8f340d..3cecb33226c 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -293,10 +293,17 @@ type endpointsChange struct { // UpdateEndpointMapResult is the updated results after applying endpoints changes. type UpdateEndpointMapResult struct { - // StaleEndpoints identifies if an endpoints service pair is stale. - StaleEndpoints []ServiceEndpoint - // StaleServiceNames identifies if a service is stale. - StaleServiceNames []ServicePortName + // DeletedUDPEndpoints identifies UDP endpoints that have just been deleted. + // Existing conntrack NAT entries pointing to these endpoints must be deleted to + // ensure that no further traffic for the Service gets delivered to them. + DeletedUDPEndpoints []ServiceEndpoint + + // NewlyActiveUDPServices identifies UDP Services that have just gone from 0 to + // non-0 endpoints. Existing conntrack entries caching the fact that these + // services are black holes must be deleted to ensure that traffic can immediately + // begin flowing to the new endpoints. + NewlyActiveUDPServices []ServicePortName + // List of the trigger times for all endpoints objects that changed. It's used to export the // network programming latency. // NOTE(oxddr): this can be simplified to []time.Time if memory consumption becomes an issue. @@ -305,26 +312,24 @@ type UpdateEndpointMapResult struct { // Update updates endpointsMap base on the given changes. func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) { - result.StaleEndpoints = make([]ServiceEndpoint, 0) - result.StaleServiceNames = make([]ServicePortName, 0) + result.DeletedUDPEndpoints = make([]ServiceEndpoint, 0) + result.NewlyActiveUDPServices = make([]ServicePortName, 0) result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time) - em.apply( - changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes) + em.apply(changes, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices, &result.LastChangeTriggerTimes) + return result } // EndpointsMap maps a service name to a list of all its Endpoints. type EndpointsMap map[ServicePortName][]Endpoint -// apply the changes to EndpointsMap and updates stale endpoints and service-endpoints pair. The `staleEndpoints` argument -// is passed in to store the stale udp endpoints and `staleServiceNames` argument is passed in to store the stale udp service. -// The changes map is cleared after applying them. -// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints -// that were changed and will result in syncing the proxy rules. -// apply triggers processEndpointsMapChange on every change. -func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, - staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) { +// apply the changes to EndpointsMap, update the passed-in stale-conntrack-entry arrays, +// and clear the changes map. In addition it returns (via argument) and resets the +// lastChangeTriggerTimes for all endpoints that were changed and will result in syncing +// the proxy rules. apply triggers processEndpointsMapChange on every change. +func (em EndpointsMap) apply(ect *EndpointChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint, + newlyActiveUDPServices *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) { if ect == nil { return } @@ -336,7 +341,7 @@ func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]Servi } em.unmerge(change.previous) em.merge(change.current) - detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames) + detectStaleConntrackEntries(change.previous, change.current, deletedUDPEndpoints, newlyActiveUDPServices) } ect.checkoutTriggerTimes(lastChangeTriggerTimes) } @@ -397,41 +402,45 @@ func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int { return eps } -// detectStaleConnections modifies and with detected stale connections. -// is used to store stale udp service in order to clear udp conntrack later. -func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) { - // Detect stale endpoints: an endpoint can have stale conntrack entries if it was receiving traffic - // and then goes unready or changes its IP address. +// detectStaleConntrackEntries detects services that may be associated with stale conntrack entries. +// (See UpdateEndpointMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.) +func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName) { + // Find the UDP endpoints that we were sending traffic to in oldEndpointsMap, but + // are no longer sending to newEndpointsMap. The proxier should make sure that + // conntrack does not accidentally route any new connections to them. for svcPortName, epList := range oldEndpointsMap { if svcPortName.Protocol != v1.ProtocolUDP { continue } for _, ep := range epList { - // if the old endpoint wasn't ready is not possible to have stale entries - // since there was no traffic sent to it. + // If the old endpoint wasn't Ready then there can't be stale + // conntrack entries since there was no traffic sent to it. if !ep.IsReady() { continue } - stale := true - // Check if the endpoint has changed, including if it went from ready to not ready. - // If it did change stale entries for the old endpoint has to be cleared. + + deleted := true + // Check if the endpoint has changed, including if it went from + // ready to not ready. If it did change stale entries for the old + // endpoint have to be cleared. for i := range newEndpointsMap[svcPortName] { if newEndpointsMap[svcPortName][i].Equal(ep) { - stale = false + deleted = false break } } - if stale { - klog.V(4).InfoS("Stale endpoint", "portName", svcPortName, "endpoint", ep) - *staleEndpoints = append(*staleEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName}) + if deleted { + klog.V(4).InfoS("Deleted endpoint may have stale conntrack entries", "portName", svcPortName, "endpoint", ep) + *deletedUDPEndpoints = append(*deletedUDPEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName}) } } } - // Detect stale services - // For udp service, if its backend changes from 0 to non-0 ready endpoints. - // There may exist a conntrack entry that could blackhole traffic to the service. + // Detect services that have gone from 0 to non-0 ready endpoints. If there were + // previously 0 endpoints, but someone tried to connect to it, then a conntrack + // entry may have been created blackholing traffic to that IP, which should be + // deleted now. for svcPortName, epList := range newEndpointsMap { if svcPortName.Protocol != v1.ProtocolUDP { continue @@ -452,7 +461,7 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, stale } if epReady > 0 && oldEpReady == 0 { - *staleServiceNames = append(*staleServiceNames, svcPortName) + *newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName) } } } diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 3e7388bdbe3..353bd1a7042 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -498,23 +498,23 @@ func TestUpdateEndpointsMap(t *testing.T) { // previousEndpoints and currentEndpoints are used to call appropriate // handlers OnEndpointSlice* (based on whether corresponding values are nil // or non-nil) and must be of equal length. - name string - previousEndpoints []*discovery.EndpointSlice - currentEndpoints []*discovery.EndpointSlice - oldEndpoints map[ServicePortName][]*BaseEndpointInfo - expectedResult map[ServicePortName][]*BaseEndpointInfo - expectedStaleEndpoints []ServiceEndpoint - expectedStaleServiceNames map[ServicePortName]bool - expectedLocalEndpoints map[types.NamespacedName]int - expectedChangedEndpoints sets.String + name string + previousEndpoints []*discovery.EndpointSlice + currentEndpoints []*discovery.EndpointSlice + oldEndpoints map[ServicePortName][]*BaseEndpointInfo + expectedResult map[ServicePortName][]*BaseEndpointInfo + expectedDeletedUDPEndpoints []ServiceEndpoint + expectedNewlyActiveUDPServices map[ServicePortName]bool + expectedLocalEndpoints map[types.NamespacedName]int + expectedChangedEndpoints sets.String }{{ - name: "empty", - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{}, - expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, - expectedStaleEndpoints: []ServiceEndpoint{}, - expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.NewString(), + name: "empty", + oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{}, + expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, + expectedDeletedUDPEndpoints: []ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString(), }, { name: "no change, unnamed port", previousEndpoints: []*discovery.EndpointSlice{ @@ -533,10 +533,10 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{}, - expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.NewString(), + expectedDeletedUDPEndpoints: []ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString(), }, { name: "no change, named port, local", previousEndpoints: []*discovery.EndpointSlice{ @@ -555,8 +555,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{}, - expectedStaleServiceNames: map[ServicePortName]bool{}, + expectedDeletedUDPEndpoints: []ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -587,10 +587,10 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{}, - expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.NewString(), + expectedDeletedUDPEndpoints: []ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString(), }, { name: "no change, multiple slices, multiple ports, local", previousEndpoints: []*discovery.EndpointSlice{ @@ -623,8 +623,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{}, - expectedStaleServiceNames: map[ServicePortName]bool{}, + expectedDeletedUDPEndpoints: []ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -693,8 +693,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "2.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{}, - expectedStaleServiceNames: map[ServicePortName]bool{}, + expectedDeletedUDPEndpoints: []ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, @@ -714,8 +714,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{}, - expectedStaleServiceNames: map[ServicePortName]bool{ + expectedDeletedUDPEndpoints: []ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{ @@ -736,13 +736,13 @@ func TestUpdateEndpointsMap(t *testing.T) { }, }, expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, - expectedStaleEndpoints: []ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []ServiceEndpoint{{ Endpoint: "1.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.NewString("ns1/ep1"), + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "add an IP and port", previousEndpoints: []*discovery.EndpointSlice{ @@ -766,8 +766,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{}, - expectedStaleServiceNames: map[ServicePortName]bool{ + expectedDeletedUDPEndpoints: []ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{ @@ -797,7 +797,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []ServiceEndpoint{{ Endpoint: "1.1.1.2:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }, { @@ -807,9 +807,9 @@ func TestUpdateEndpointsMap(t *testing.T) { Endpoint: "1.1.1.2:12", ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.NewString("ns1/ep1"), + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "add a slice to an endpoint", previousEndpoints: []*discovery.EndpointSlice{ @@ -833,8 +833,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{}, - expectedStaleServiceNames: map[ServicePortName]bool{ + expectedDeletedUDPEndpoints: []ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{ @@ -864,13 +864,13 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []ServiceEndpoint{{ Endpoint: "1.1.1.2:12", ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.NewString("ns1/ep1"), + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "rename a port", previousEndpoints: []*discovery.EndpointSlice{ @@ -889,11 +889,11 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []ServiceEndpoint{{ Endpoint: "1.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[ServicePortName]bool{ + expectedNewlyActiveUDPServices: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{}, @@ -916,13 +916,13 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []ServiceEndpoint{{ Endpoint: "1.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.NewString("ns1/ep1"), + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.NewString("ns1/ep1"), }, { name: "complex add and remove", previousEndpoints: []*discovery.EndpointSlice{ @@ -988,7 +988,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "4.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []ServiceEndpoint{{ Endpoint: "2.2.2.2:22", ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), }, { @@ -1004,7 +1004,7 @@ func TestUpdateEndpointsMap(t *testing.T) { Endpoint: "4.4.4.6:45", ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[ServicePortName]bool{ + expectedNewlyActiveUDPServices: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, @@ -1027,8 +1027,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []ServiceEndpoint{}, - expectedStaleServiceNames: map[ServicePortName]bool{ + expectedDeletedUDPEndpoints: []ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{}, @@ -1079,33 +1079,33 @@ func TestUpdateEndpointsMap(t *testing.T) { result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap compareEndpointsMapsStr(t, newMap, tc.expectedResult) - if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) { - t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints) + if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) { + t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints) } - for _, x := range tc.expectedStaleEndpoints { + for _, x := range tc.expectedDeletedUDPEndpoints { found := false - for _, stale := range result.StaleEndpoints { + for _, stale := range result.DeletedUDPEndpoints { if stale == x { found = true break } } if !found { - t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints) + t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints) } } - if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) { - t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames) + if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) { + t.Errorf("[%d] expected %d newlyActiveUDPServices, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices) } - for svcName := range tc.expectedStaleServiceNames { + for svcName := range tc.expectedNewlyActiveUDPServices { found := false - for _, stale := range result.StaleServiceNames { - if stale == svcName { + for _, newSvcName := range result.NewlyActiveUDPServices { + if newSvcName == svcName { found = true } } if !found { - t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) + t.Errorf("[%d] expected newlyActiveUDPServices[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices) } } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index e00144de210..937441ecaac 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -744,35 +744,34 @@ func isServiceChainName(chainString string) bool { return false } -// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we -// risk sending more traffic to it, all of which will be lost. +// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we +// risk sending more traffic to it, all of which will be lost (because UDP). // This assumes the proxier mutex is held // TODO: move it to util -func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { - for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { +func (proxier *Proxier) deleteUDPEndpointConnections(deletedUDPEndpoints []proxy.ServiceEndpoint) { + for _, epSvcPair := range deletedUDPEndpoints { + if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) nodePort := svcInfo.NodePort() - svcProto := svcInfo.Protocol() var err error if nodePort != 0 { - err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto) + err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName) } } - err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) + err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName) } for _, extIP := range svcInfo.ExternalIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) + err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) } } for _, lbIP := range svcInfo.LoadBalancerIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) + err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) } @@ -834,13 +833,13 @@ func (proxier *Proxier) syncProxyRules() { // We need to detect stale connections to UDP Services so we // can clean dangling conntrack entries that can blackhole traffic. - conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP + conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs conntrackCleanupServiceNodePorts := sets.NewInt() // merge stale services gathered from updateEndpointsMap // an UDP service that changes from 0 to non-0 endpoints is considered stale. - for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { - klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) + for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices { + if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok { + klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName) conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { conntrackCleanupServiceIPs.Insert(extIP) @@ -850,7 +849,6 @@ func (proxier *Proxier) syncProxyRules() { } nodePort := svcInfo.NodePort() if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 { - klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "nodePort", nodePort) conntrackCleanupServiceNodePorts.Insert(nodePort) } } @@ -1638,8 +1636,8 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort) } } - klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints) - proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) + klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.DeletedUDPEndpoints) + proxier.deleteUDPEndpointConnections(endpointUpdateResult.DeletedUDPEndpoints) } func (proxier *Proxier) writeServiceToEndpointRules(svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) { diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index c6f64716063..7e2793145bb 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -60,7 +60,7 @@ import ( "k8s.io/utils/pointer" ) -func TestDeleteEndpointConnectionsIPv4(t *testing.T) { +func TestDeleteEndpointConnections(t *testing.T) { const ( UDP = v1.ProtocolUDP TCP = v1.ProtocolTCP @@ -73,8 +73,7 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) { svcIP string svcPort int32 protocol v1.Protocol - endpoint string // IP:port endpoint - epSvcPair proxy.ServiceEndpoint // Will be generated by test + endpoint string // IP:port endpoint simulatedErr string }{ { @@ -104,140 +103,21 @@ func TestDeleteEndpointConnectionsIPv4(t *testing.T) { { description: "V4 UDP, nothing to delete, benign error", svcName: "v4-udp-nothing-to-delete", - svcIP: "172.30.1.1", + svcIP: "172.30.4.4", svcPort: 80, protocol: UDP, - endpoint: "10.240.0.3:80", + endpoint: "10.240.0.6:80", simulatedErr: conntrack.NoConnectionToDelete, }, { description: "V4 UDP, unexpected error, should be glogged", svcName: "v4-udp-simulated-error", - svcIP: "172.30.1.1", + svcIP: "172.30.5.5", svcPort: 80, protocol: UDP, - endpoint: "10.240.0.3:80", + endpoint: "10.240.0.7:80", simulatedErr: "simulated error", }, - } - - // Create a fake executor for the conntrack utility. This should only be - // invoked for UDP and SCTP connections, since no conntrack cleanup is needed for TCP - fcmd := fakeexec.FakeCmd{} - fexec := &fakeexec.FakeExec{ - LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, - } - execFunc := func(cmd string, args ...string) exec.Cmd { - return fakeexec.InitFakeCmd(&fcmd, cmd, args...) - } - for _, tc := range testCases { - if conntrack.IsClearConntrackNeeded(tc.protocol) { - var cmdOutput string - var simErr error - if tc.simulatedErr == "" { - cmdOutput = "1 flow entries have been deleted" - } else { - simErr = fmt.Errorf(tc.simulatedErr) - } - cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr } - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - } - } - - ipt := iptablestest.NewFake() - fp := NewFakeProxier(ipt) - fp.exec = fexec - - for _, tc := range testCases { - makeServiceMap(fp, - makeTestService("ns1", tc.svcName, func(svc *v1.Service) { - svc.Spec.ClusterIP = tc.svcIP - svc.Spec.Ports = []v1.ServicePort{{ - Name: "p80", - Port: tc.svcPort, - Protocol: tc.protocol, - }} - svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal - }), - ) - - fp.svcPortMap.Update(fp.serviceChanges) - } - - // Run the test cases - for _, tc := range testCases { - priorExecs := fexec.CommandCalls - priorGlogErrs := klog.Stats.Error.Lines() - - svc := proxy.ServicePortName{ - NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName}, - Port: "p80", - Protocol: tc.protocol, - } - input := []proxy.ServiceEndpoint{ - { - Endpoint: tc.endpoint, - ServicePortName: svc, - }, - } - - fp.deleteEndpointConnections(input) - - // For UDP and SCTP connections, check the executed conntrack command - var expExecs int - if conntrack.IsClearConntrackNeeded(tc.protocol) { - isIPv6 := func(ip string) bool { - netIP := netutils.ParseIPSloppy(ip) - return netIP.To4() == nil - } - endpointIP := utilproxy.IPPart(tc.endpoint) - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.svcIP, endpointIP, strings.ToLower(string((tc.protocol)))) - if isIPv6(endpointIP) { - expectCommand += " -f ipv6" - } - actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ") - if actualCommand != expectCommand { - t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand) - } - expExecs = 1 - } - - // Check the number of times conntrack was executed - execs := fexec.CommandCalls - priorExecs - if execs != expExecs { - t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs) - } - - // Check the number of new glog errors - var expGlogErrs int64 - if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete { - expGlogErrs = 1 - } - glogErrs := klog.Stats.Error.Lines() - priorGlogErrs - if glogErrs != expGlogErrs { - t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs) - } - } -} - -func TestDeleteEndpointConnectionsIPv6(t *testing.T) { - const ( - UDP = v1.ProtocolUDP - TCP = v1.ProtocolTCP - SCTP = v1.ProtocolSCTP - ) - - testCases := []struct { - description string - svcName string - svcIP string - svcPort int32 - protocol v1.Protocol - endpoint string // IP:port endpoint - epSvcPair proxy.ServiceEndpoint // Will be generated by test - simulatedErr string - }{ { description: "V6 UDP", svcName: "v6-udp", @@ -264,103 +144,128 @@ func TestDeleteEndpointConnectionsIPv6(t *testing.T) { }, } - // Create a fake executor for the conntrack utility. This should only be - // invoked for UDP and SCTP connections, since no conntrack cleanup is needed for TCP - fcmd := fakeexec.FakeCmd{} - fexec := &fakeexec.FakeExec{ - LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, - } - execFunc := func(cmd string, args ...string) exec.Cmd { - return fakeexec.InitFakeCmd(&fcmd, cmd, args...) - } for _, tc := range testCases { - if conntrack.IsClearConntrackNeeded(tc.protocol) { - var cmdOutput string - var simErr error - if tc.simulatedErr == "" { - cmdOutput = "1 flow entries have been deleted" - } else { - simErr = fmt.Errorf(tc.simulatedErr) + t.Run(tc.description, func(t *testing.T) { + priorGlogErrs := klog.Stats.Error.Lines() + + // Create a fake executor for the conntrack utility. + fcmd := fakeexec.FakeCmd{} + fexec := &fakeexec.FakeExec{ + LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } - cmdFunc := func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr } - fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, cmdFunc) - fexec.CommandScript = append(fexec.CommandScript, execFunc) - } - } - - ipt := iptablestest.NewIPv6Fake() - fp := NewFakeProxier(ipt) - fp.exec = fexec - - for _, tc := range testCases { - makeServiceMap(fp, - makeTestService("ns1", tc.svcName, func(svc *v1.Service) { - svc.Spec.ClusterIP = tc.svcIP - svc.Spec.Ports = []v1.ServicePort{{ - Name: "p80", - Port: tc.svcPort, - Protocol: tc.protocol, - }} - svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal - }), - ) - - fp.svcPortMap.Update(fp.serviceChanges) - } - - // Run the test cases - for _, tc := range testCases { - priorExecs := fexec.CommandCalls - priorGlogErrs := klog.Stats.Error.Lines() - - svc := proxy.ServicePortName{ - NamespacedName: types.NamespacedName{Namespace: "ns1", Name: tc.svcName}, - Port: "p80", - Protocol: tc.protocol, - } - input := []proxy.ServiceEndpoint{ - { - Endpoint: tc.endpoint, - ServicePortName: svc, - }, - } - - fp.deleteEndpointConnections(input) - - // For UDP and SCTP connections, check the executed conntrack command - var expExecs int - if conntrack.IsClearConntrackNeeded(tc.protocol) { - isIPv6 := func(ip string) bool { - netIP := netutils.ParseIPSloppy(ip) - return netIP.To4() == nil + execFunc := func(cmd string, args ...string) exec.Cmd { + return fakeexec.InitFakeCmd(&fcmd, cmd, args...) } + + if tc.protocol == UDP { + cmdOutput := "1 flow entries have been deleted" + var simErr error + + // First call outputs cmdOutput and succeeds + fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, + func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, nil }, + ) + fexec.CommandScript = append(fexec.CommandScript, execFunc) + + // Second call may succeed or fail + if tc.simulatedErr != "" { + cmdOutput = "" + simErr = fmt.Errorf(tc.simulatedErr) + } + fcmd.CombinedOutputScript = append(fcmd.CombinedOutputScript, + func() ([]byte, []byte, error) { return []byte(cmdOutput), nil, simErr }, + ) + fexec.CommandScript = append(fexec.CommandScript, execFunc) + } + endpointIP := utilproxy.IPPart(tc.endpoint) - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.svcIP, endpointIP, strings.ToLower(string((tc.protocol)))) - if isIPv6(endpointIP) { - expectCommand += " -f ipv6" - } - actualCommand := strings.Join(fcmd.CombinedOutputLog[fexec.CommandCalls-1], " ") - if actualCommand != expectCommand { - t.Errorf("%s: Expected command: %s, but executed %s", tc.description, expectCommand, actualCommand) - } - expExecs = 1 - } + isIPv6 := netutils.IsIPv6String(endpointIP) - // Check the number of times conntrack was executed - execs := fexec.CommandCalls - priorExecs - if execs != expExecs { - t.Errorf("%s: Expected conntrack to be executed %d times, but got %d", tc.description, expExecs, execs) - } + var ipt utiliptables.Interface + if isIPv6 { + ipt = iptablestest.NewIPv6Fake() + } else { + ipt = iptablestest.NewFake() + } + fp := NewFakeProxier(ipt) + fp.exec = fexec - // Check the number of new glog errors - var expGlogErrs int64 - if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete { - expGlogErrs = 1 - } - glogErrs := klog.Stats.Error.Lines() - priorGlogErrs - if glogErrs != expGlogErrs { - t.Errorf("%s: Expected %d glogged errors, but got %d", tc.description, expGlogErrs, glogErrs) - } + makeServiceMap(fp, + makeTestService("ns1", tc.svcName, func(svc *v1.Service) { + svc.Spec.ClusterIP = tc.svcIP + svc.Spec.Ports = []v1.ServicePort{{ + Name: "p80", + Port: tc.svcPort, + Protocol: tc.protocol, + }} + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal + }), + ) + fp.svcPortMap.Update(fp.serviceChanges) + + slice := makeTestEndpointSlice("ns1", tc.svcName, 1, func(eps *discovery.EndpointSlice) { + if isIPv6 { + eps.AddressType = discovery.AddressTypeIPv6 + } else { + eps.AddressType = discovery.AddressTypeIPv4 + } + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{endpointIP}, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String("p80"), + Port: pointer.Int32(80), + Protocol: &tc.protocol, + }} + }) + + // Add and then remove the endpoint slice + fp.OnEndpointSliceAdd(slice) + fp.syncProxyRules() + fp.OnEndpointSliceDelete(slice) + fp.syncProxyRules() + + // Check the executed conntrack command + if tc.protocol == UDP { + if fexec.CommandCalls != 2 { + t.Fatalf("Expected conntrack to be executed 2 times, but got %d", fexec.CommandCalls) + } + + // First clear conntrack entries for the clusterIP when the + // endpoint is first added. + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s -p udp", tc.svcIP) + if isIPv6 { + expectCommand += " -f ipv6" + } + actualCommand := strings.Join(fcmd.CombinedOutputLog[0], " ") + if actualCommand != expectCommand { + t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) + } + + // Then clear conntrack entries for the endpoint when it is + // deleted. + expectCommand = fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.svcIP, endpointIP) + if isIPv6 { + expectCommand += " -f ipv6" + } + actualCommand = strings.Join(fcmd.CombinedOutputLog[1], " ") + if actualCommand != expectCommand { + t.Errorf("Expected command: %s, but executed %s", expectCommand, actualCommand) + } + } else if fexec.CommandCalls != 0 { + t.Fatalf("Expected conntrack to be executed 0 times, but got %d", fexec.CommandCalls) + } + + // Check the number of new glog errors + var expGlogErrs int64 + if tc.simulatedErr != "" && tc.simulatedErr != conntrack.NoConnectionToDelete { + expGlogErrs = 1 + } + glogErrs := klog.Stats.Error.Lines() - priorGlogErrs + if glogErrs != expGlogErrs { + t.Errorf("Expected %d glogged errors, but got %d", expGlogErrs, glogErrs) + } + }) } } @@ -384,10 +289,12 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { // TODO: Call NewProxier after refactoring out the goroutine // invocation into a Run() method. ipfamily := v1.IPv4Protocol + podCIDR := "10.0.0.0/8" if ipt.IsIPv6() { ipfamily = v1.IPv6Protocol + podCIDR = "fd00::/64" } - detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR("10.0.0.0/8", ipt) + detectLocal, _ := proxyutiliptables.NewDetectLocalByCIDR(podCIDR, ipt) networkInterfacer := utilproxytest.NewFakeNetwork() itf := net.Interface{Index: 0, MTU: 0, Name: "lo", HardwareAddr: nil, Flags: 0} @@ -4079,9 +3986,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected service map length 10, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { + if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } // The only-local-loadbalancer ones get added @@ -4117,11 +4024,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) { // from the three deleted services here, we still have the ClusterIP for // the not-deleted service, because one of it's ServicePorts was deleted. expectedStaleUDPServices := []string{"172.30.55.10", "172.30.55.4", "172.30.55.11", "172.30.55.12"} - if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) { - t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList()) + if len(result.DeletedUDPClusterIPs) != len(expectedStaleUDPServices) { + t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.DeletedUDPClusterIPs.UnsortedList()) } for _, ip := range expectedStaleUDPServices { - if !result.UDPStaleClusterIP.Has(ip) { + if !result.DeletedUDPClusterIPs.Has(ip) { t.Errorf("expected stale UDP service service %s", ip) } } @@ -4154,8 +4061,8 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } - if len(result.UDPStaleClusterIP) != 0 { - t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) + if len(result.DeletedUDPClusterIPs) != 0 { + t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } // No proxied services, so no healthchecks @@ -4182,8 +4089,8 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) + if len(result.DeletedUDPClusterIPs) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs) } // No proxied services, so no healthchecks healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() @@ -4223,9 +4130,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { + if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 0 { @@ -4238,8 +4145,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) + if len(result.DeletedUDPClusterIPs) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList()) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 1 { @@ -4253,8 +4160,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) + if len(result.DeletedUDPClusterIPs) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList()) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 1 { @@ -4267,9 +4174,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { + if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 0 { @@ -4674,22 +4581,22 @@ func TestUpdateEndpointsMap(t *testing.T) { // previousEndpoints and currentEndpoints are used to call appropriate // handlers OnEndpoints* (based on whether corresponding values are nil // or non-nil) and must be of equal length. - name string - previousEndpoints []*discovery.EndpointSlice - currentEndpoints []*discovery.EndpointSlice - oldEndpoints map[proxy.ServicePortName][]*endpointsInfo - expectedResult map[proxy.ServicePortName][]*endpointsInfo - expectedStaleEndpoints []proxy.ServiceEndpoint - expectedStaleServiceNames map[proxy.ServicePortName]bool - expectedLocalEndpoints map[types.NamespacedName]int + name string + previousEndpoints []*discovery.EndpointSlice + currentEndpoints []*discovery.EndpointSlice + oldEndpoints map[proxy.ServicePortName][]*endpointsInfo + expectedResult map[proxy.ServicePortName][]*endpointsInfo + expectedDeletedUDPEndpoints []proxy.ServiceEndpoint + expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool + expectedLocalEndpoints map[types.NamespacedName]int }{{ // Case[0]: nothing - name: "nothing", - oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + name: "nothing", + oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[1]: no change, named port, local name: "no change, named port, local", @@ -4705,8 +4612,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -4731,9 +4638,9 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[3]: no change, multiple subsets, multiple ports, local name: "no change, multiple subsets, multiple ports, local", @@ -4761,8 +4668,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -4823,8 +4730,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.2.2.2:22", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, @@ -4840,8 +4747,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{ @@ -4858,12 +4765,12 @@ func TestUpdateEndpointsMap(t *testing.T) { }, }, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, - expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "10.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[7]: add an IP and port name: "add an IP and port", @@ -4884,8 +4791,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{ @@ -4911,7 +4818,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "10.1.1.2:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }, { @@ -4921,8 +4828,8 @@ func TestUpdateEndpointsMap(t *testing.T) { Endpoint: "10.1.1.2:12", ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[9]: add a subset name: "add a subset", @@ -4941,8 +4848,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.2:12", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{ @@ -4966,12 +4873,12 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "10.1.1.2:12", ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[11]: rename a port name: "rename a port", @@ -4987,11 +4894,11 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "10.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{}, @@ -5010,12 +4917,12 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "10.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, }, { // Case[13]: complex add and remove name: "complex add and remove", @@ -5058,7 +4965,7 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.4.4.4:44", IsLocal: true, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "10.2.2.2:22", ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), }, { @@ -5074,7 +4981,7 @@ func TestUpdateEndpointsMap(t *testing.T) { Endpoint: "10.4.4.6:45", ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, @@ -5093,8 +5000,8 @@ func TestUpdateEndpointsMap(t *testing.T) { {BaseEndpointInfo: &proxy.BaseEndpointInfo{Endpoint: "10.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{}, @@ -5136,33 +5043,33 @@ func TestUpdateEndpointsMap(t *testing.T) { result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap compareEndpointsMapsExceptChainName(t, tci, newMap, tc.expectedResult) - if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) { - t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints) + if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) { + t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints) } - for _, x := range tc.expectedStaleEndpoints { + for _, x := range tc.expectedDeletedUDPEndpoints { found := false - for _, stale := range result.StaleEndpoints { + for _, stale := range result.DeletedUDPEndpoints { if stale == x { found = true break } } if !found { - t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints) + t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints) } } - if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) { - t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames) + if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) { + t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices) } - for svcName := range tc.expectedStaleServiceNames { + for svcName := range tc.expectedNewlyActiveUDPServices { found := false - for _, stale := range result.StaleServiceNames { + for _, stale := range result.NewlyActiveUDPServices { if stale == svcName { found = true } } if !found { - t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) + t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices) } } localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index d00801beb4b..cf52b2fcdce 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -948,13 +948,13 @@ func (proxier *Proxier) syncProxyRules() { // We need to detect stale connections to UDP Services so we // can clean dangling conntrack entries that can blackhole traffic. - conntrackCleanupServiceIPs := serviceUpdateResult.UDPStaleClusterIP + conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs conntrackCleanupServiceNodePorts := sets.NewInt() // merge stale services gathered from updateEndpointsMap // an UDP service that changes from 0 to non-0 endpoints is considered stale. - for _, svcPortName := range endpointUpdateResult.StaleServiceNames { - if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { - klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) + for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices { + if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok { + klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName) conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { conntrackCleanupServiceIPs.Insert(extIP) @@ -964,7 +964,6 @@ func (proxier *Proxier) syncProxyRules() { } nodePort := svcInfo.NodePort() if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 { - klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "nodePort", nodePort) conntrackCleanupServiceNodePorts.Insert(nodePort) } } @@ -1545,8 +1544,8 @@ func (proxier *Proxier) syncProxyRules() { klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort) } } - klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.StaleEndpoints) - proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) + klog.V(4).InfoS("Deleting stale endpoint connections", "endpoints", endpointUpdateResult.DeletedUDPEndpoints) + proxier.deleteUDPEndpointConnections(endpointUpdateResult.DeletedUDPEndpoints) } // writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed @@ -1813,35 +1812,34 @@ func (proxier *Proxier) createAndLinkKubeChain() { } -// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we -// risk sending more traffic to it, all of which will be lost. +// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we +// risk sending more traffic to it, all of which will be lost (because UDP). // This assumes the proxier mutex is held // TODO: move it to util -func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceEndpoint) { - for _, epSvcPair := range connectionMap { - if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { +func (proxier *Proxier) deleteUDPEndpointConnections(deletedUDPEndpoints []proxy.ServiceEndpoint) { + for _, epSvcPair := range deletedUDPEndpoints { + if svcInfo, ok := proxier.svcPortMap[epSvcPair.ServicePortName]; ok { endpointIP := utilproxy.IPPart(epSvcPair.Endpoint) nodePort := svcInfo.NodePort() - svcProto := svcInfo.Protocol() var err error if nodePort != 0 { - err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, svcProto) + err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName) } } - err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, svcProto) + err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName) } for _, extIP := range svcInfo.ExternalIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, svcProto) + err := conntrack.ClearEntriesForNAT(proxier.exec, extIP, endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP) } } for _, lbIP := range svcInfo.LoadBalancerIPStrings() { - err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, svcProto) + err := conntrack.ClearEntriesForNAT(proxier.exec, lbIP, endpointIP, v1.ProtocolUDP) if err != nil { klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP) } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index e8eed697741..4a0e515cc9b 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -2543,9 +2543,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) { t.Errorf("expected service map length 12, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { + if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } // The only-local-loadbalancer ones get added @@ -2581,11 +2581,11 @@ func TestBuildServiceMapAddRemove(t *testing.T) { // from the three deleted services here, we still have the ClusterIP for // the not-deleted service, because one of it's ServicePorts was deleted. expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"} - if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) { - t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.List()) + if len(result.DeletedUDPClusterIPs) != len(expectedStaleUDPServices) { + t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.DeletedUDPClusterIPs.List()) } for _, ip := range expectedStaleUDPServices { - if !result.UDPStaleClusterIP.Has(ip) { + if !result.DeletedUDPClusterIPs.Has(ip) { t.Errorf("expected stale UDP service service %s", ip) } } @@ -2625,8 +2625,8 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } - if len(result.UDPStaleClusterIP) != 0 { - t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) + if len(result.DeletedUDPClusterIPs) != 0 { + t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } // No proxied services, so no healthchecks @@ -2655,8 +2655,8 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) + if len(result.DeletedUDPClusterIPs) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs) } // No proxied services, so no healthchecks @@ -2699,9 +2699,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { + if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() @@ -2715,8 +2715,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List()) + if len(result.DeletedUDPClusterIPs) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.List()) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() @@ -2731,8 +2731,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.List()) + if len(result.DeletedUDPClusterIPs) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.List()) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() @@ -2746,9 +2746,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { + if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() @@ -3148,22 +3148,22 @@ func Test_updateEndpointsMap(t *testing.T) { // previousEndpoints and currentEndpoints are used to call appropriate // handlers OnEndpoints* (based on whether corresponding values are nil // or non-nil) and must be of equal length. - name string - previousEndpoints []*discovery.EndpointSlice - currentEndpoints []*discovery.EndpointSlice - oldEndpoints map[proxy.ServicePortName][]*proxy.BaseEndpointInfo - expectedResult map[proxy.ServicePortName][]*proxy.BaseEndpointInfo - expectedStaleEndpoints []proxy.ServiceEndpoint - expectedStaleServiceNames map[proxy.ServicePortName]bool - expectedReadyEndpoints map[types.NamespacedName]int + name string + previousEndpoints []*discovery.EndpointSlice + currentEndpoints []*discovery.EndpointSlice + oldEndpoints map[proxy.ServicePortName][]*proxy.BaseEndpointInfo + expectedResult map[proxy.ServicePortName][]*proxy.BaseEndpointInfo + expectedDeletedUDPEndpoints []proxy.ServiceEndpoint + expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool + expectedReadyEndpoints map[types.NamespacedName]int }{{ // Case[0]: nothing - name: "nothing", - oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{}, - expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{}, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + name: "nothing", + oldEndpoints: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{}, + expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{}, + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[1]: no change, named port, local name: "no change, named port, local", @@ -3179,8 +3179,8 @@ func Test_updateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:11", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -3205,9 +3205,9 @@ func Test_updateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.2:12", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[3]: no change, multiple subsets, multiple ports, local name: "no change, multiple subsets, multiple ports, local", @@ -3235,8 +3235,8 @@ func Test_updateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.3:13", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, @@ -3297,8 +3297,8 @@ func Test_updateEndpointsMap(t *testing.T) { {Endpoint: "2.2.2.2:22", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, expectedReadyEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, @@ -3314,8 +3314,8 @@ func Test_updateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:11", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, expectedReadyEndpoints: map[types.NamespacedName]int{ @@ -3332,12 +3332,12 @@ func Test_updateEndpointsMap(t *testing.T) { }, }, expectedResult: map[proxy.ServicePortName][]*proxy.BaseEndpointInfo{}, - expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "1.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[7]: add an IP and port name: "add an IP and port", @@ -3358,8 +3358,8 @@ func Test_updateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.2:12", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, expectedReadyEndpoints: map[types.NamespacedName]int{ @@ -3385,7 +3385,7 @@ func Test_updateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "1.1.1.2:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }, { @@ -3395,8 +3395,8 @@ func Test_updateEndpointsMap(t *testing.T) { Endpoint: "1.1.1.2:12", ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[9]: add a subset name: "add a subset", @@ -3415,8 +3415,8 @@ func Test_updateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.2:12", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, }, expectedReadyEndpoints: map[types.NamespacedName]int{ @@ -3440,12 +3440,12 @@ func Test_updateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "1.1.1.2:12", ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[11]: rename a port name: "rename a port", @@ -3461,11 +3461,11 @@ func Test_updateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "1.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, expectedReadyEndpoints: map[types.NamespacedName]int{}, @@ -3484,12 +3484,12 @@ func Test_updateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:22", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "1.1.1.1:11", ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{}, - expectedReadyEndpoints: map[types.NamespacedName]int{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{}, + expectedReadyEndpoints: map[types.NamespacedName]int{}, }, { // Case[13]: complex add and remove name: "complex add and remove", @@ -3532,7 +3532,7 @@ func Test_updateEndpointsMap(t *testing.T) { {Endpoint: "4.4.4.4:44", NodeName: nodeName, IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{ Endpoint: "2.2.2.2:22", ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP), }, { @@ -3548,7 +3548,7 @@ func Test_updateEndpointsMap(t *testing.T) { Endpoint: "4.4.4.6:45", ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP), }}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true, makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true, makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true, @@ -3567,8 +3567,8 @@ func Test_updateEndpointsMap(t *testing.T) { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, }, - expectedStaleEndpoints: []proxy.ServiceEndpoint{}, - expectedStaleServiceNames: map[proxy.ServicePortName]bool{ + expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true, }, expectedReadyEndpoints: map[types.NamespacedName]int{}, @@ -3612,34 +3612,34 @@ func Test_updateEndpointsMap(t *testing.T) { result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap compareEndpointsMaps(t, tci, newMap, tc.expectedResult) - if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) { - t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints) + if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) { + t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints) } - for _, x := range tc.expectedStaleEndpoints { + for _, x := range tc.expectedDeletedUDPEndpoints { found := false - for _, stale := range result.StaleEndpoints { + for _, stale := range result.DeletedUDPEndpoints { if stale == x { found = true break } } if !found { - t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.StaleEndpoints) + t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints) } } - if len(result.StaleServiceNames) != len(tc.expectedStaleServiceNames) { - t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedStaleServiceNames), len(result.StaleServiceNames), result.StaleServiceNames) + if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) { + t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices) } - for svcName := range tc.expectedStaleServiceNames { + for svcName := range tc.expectedNewlyActiveUDPServices { found := false - for _, stale := range result.StaleServiceNames { + for _, stale := range result.NewlyActiveUDPServices { if stale == svcName { found = true break } } if !found { - t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.StaleServiceNames) + t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices) } } localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints() diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index 1d2d8071988..5832f0ba4c6 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -337,15 +337,16 @@ func (sct *ServiceChangeTracker) PendingChanges() sets.String { // UpdateServiceMapResult is the updated results after applying service changes. type UpdateServiceMapResult struct { - // UDPStaleClusterIP holds stale (no longer assigned to a Service) Service IPs that had UDP ports. - // Callers can use this to abort timeout-waits or clear connection-tracking information. - UDPStaleClusterIP sets.String + // DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs + // that had UDP ports. Callers can use this to abort timeout-waits or clear + // connection-tracking information. + DeletedUDPClusterIPs sets.String } // Update updates ServicePortMap base on the given changes. func (sm ServicePortMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) { - result.UDPStaleClusterIP = sets.NewString() - sm.apply(changes, result.UDPStaleClusterIP) + result.DeletedUDPClusterIPs = sets.NewString() + sm.apply(changes, result.DeletedUDPClusterIPs) return result } @@ -398,10 +399,9 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic return svcPortMap } -// apply the changes to ServicePortMap and update the stale udp cluster IP set. The UDPStaleClusterIP argument is passed in to store the -// udp protocol service cluster ip when service is deleted from the ServicePortMap. +// apply the changes to ServicePortMap and update the deleted UDP cluster IP set. // apply triggers processServiceMapChange on every change. -func (sm *ServicePortMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP sets.String) { +func (sm *ServicePortMap) apply(changes *ServiceChangeTracker, deletedUDPClusterIPs sets.String) { changes.lock.Lock() defer changes.lock.Unlock() for _, change := range changes.items { @@ -412,7 +412,7 @@ func (sm *ServicePortMap) apply(changes *ServiceChangeTracker, UDPStaleClusterIP // filter out the Update event of current changes from previous changes before calling unmerge() so that can // skip deleting the Update events. change.previous.filter(change.current) - sm.unmerge(change.previous, UDPStaleClusterIP) + sm.unmerge(change.previous, deletedUDPClusterIPs) } // clear changes after applying them to ServicePortMap. changes.items = make(map[types.NamespacedName]*serviceChange) @@ -467,15 +467,15 @@ func (sm *ServicePortMap) filter(other ServicePortMap) { } } -// unmerge deletes all other ServicePortMap's elements from current ServicePortMap. We pass in the UDPStaleClusterIP strings sets -// for storing the stale udp service cluster IPs. We will clear stale udp connection base on UDPStaleClusterIP later -func (sm *ServicePortMap) unmerge(other ServicePortMap, UDPStaleClusterIP sets.String) { +// unmerge deletes all other ServicePortMap's elements from current ServicePortMap and +// updates deletedUDPClusterIPs with all of the newly-deleted UDP cluster IPs. +func (sm *ServicePortMap) unmerge(other ServicePortMap, deletedUDPClusterIPs sets.String) { for svcPortName := range other { info, exists := (*sm)[svcPortName] if exists { klog.V(4).InfoS("Removing service port", "portName", svcPortName) if info.Protocol() == v1.ProtocolUDP { - UDPStaleClusterIP.Insert(info.ClusterIP().String()) + deletedUDPClusterIPs.Insert(info.ClusterIP().String()) } delete(*sm, svcPortName) } else { diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index c1c1e557e14..22fbadbae42 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -568,8 +568,9 @@ func TestServiceMapUpdateHeadless(t *testing.T) { if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } - if len(result.UDPStaleClusterIP) != 0 { - t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) + + if len(result.DeletedUDPClusterIPs) != 0 { + t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } // No proxied services, so no healthchecks @@ -599,8 +600,8 @@ func TestUpdateServiceTypeExternalName(t *testing.T) { if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP) + if len(result.DeletedUDPClusterIPs) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs) } // No proxied services, so no healthchecks @@ -673,9 +674,9 @@ func TestBuildServiceMapAddRemove(t *testing.T) { if len(fp.svcPortMap) != 8 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { + if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } // The only-local-loadbalancer ones get added @@ -719,12 +720,12 @@ func TestBuildServiceMapAddRemove(t *testing.T) { // All services but one were deleted. While you'd expect only the ClusterIPs // from the three deleted services here, we still have the ClusterIP for // the not-deleted service, because one of it's ServicePorts was deleted. - expectedStaleUDPServices := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"} - if len(result.UDPStaleClusterIP) != len(expectedStaleUDPServices) { - t.Errorf("expected stale UDP services length %d, got %v", len(expectedStaleUDPServices), result.UDPStaleClusterIP.UnsortedList()) + expectedDeletedUDPClusterIPs := []string{"172.16.55.10", "172.16.55.4", "172.16.55.11", "172.16.55.12"} + if len(result.DeletedUDPClusterIPs) != len(expectedDeletedUDPClusterIPs) { + t.Errorf("expected stale UDP services length %d, got %v", len(expectedDeletedUDPClusterIPs), result.DeletedUDPClusterIPs.UnsortedList()) } - for _, ip := range expectedStaleUDPServices { - if !result.UDPStaleClusterIP.Has(ip) { + for _, ip := range expectedDeletedUDPClusterIPs { + if !result.DeletedUDPClusterIPs.Has(ip) { t.Errorf("expected stale UDP service service %s", ip) } } @@ -764,9 +765,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { + if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } healthCheckNodePorts := fp.svcPortMap.HealthCheckNodePorts() @@ -784,8 +785,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) + if len(result.DeletedUDPClusterIPs) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList()) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() @@ -804,8 +805,8 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { - t.Errorf("expected stale UDP services length 0, got %v", result.UDPStaleClusterIP.UnsortedList()) + if len(result.DeletedUDPClusterIPs) != 0 { + t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList()) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() @@ -823,9 +824,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } - if len(result.UDPStaleClusterIP) != 0 { + if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet - t.Errorf("expected stale UDP services length 0, got %d", len(result.UDPStaleClusterIP)) + t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 822f7e1453a..3c14451dbdb 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -1202,12 +1202,12 @@ func (proxier *Proxier) syncProxyRules() { serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) - staleServices := serviceUpdateResult.UDPStaleClusterIP + deletedUDPClusterIPs := serviceUpdateResult.DeletedUDPClusterIPs // merge stale services gathered from updateEndpointsMap - for _, svcPortName := range endpointUpdateResult.StaleServiceNames { + for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices { if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP { - klog.V(2).InfoS("Stale udp service", "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP()) - staleServices.Insert(svcInfo.ClusterIP().String()) + klog.V(2).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName) + deletedUDPClusterIPs.Insert(svcInfo.ClusterIP().String()) } } // Query HNS for endpoints and load balancers @@ -1653,7 +1653,7 @@ func (proxier *Proxier) syncProxyRules() { // Finish housekeeping. // TODO: these could be made more consistent. - for _, svcIP := range staleServices.UnsortedList() { + for _, svcIP := range deletedUDPClusterIPs.UnsortedList() { // TODO : Check if this is required to cleanup stale services here klog.V(5).InfoS("Pending delete stale service IP connections", "IP", svcIP) } diff --git a/pkg/util/conntrack/conntrack.go b/pkg/util/conntrack/conntrack.go index 3a140c83df2..4b39f61ac79 100644 --- a/pkg/util/conntrack/conntrack.go +++ b/pkg/util/conntrack/conntrack.go @@ -100,7 +100,7 @@ func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1. // TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed. // These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it // is expensive to baby sit all udp connections to kubernetes services. - return fmt.Errorf("error deleting conntrack entries for %s peer {%s, %s}, error: %v", protoStr(protocol), origin, dest, err) + return fmt.Errorf("error deleting conntrack entries for UDP peer {%s, %s}, error: %v", origin, dest, err) } return nil } @@ -116,12 +116,7 @@ func ClearEntriesForPortNAT(execer exec.Interface, dest string, port int, protoc parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest) err := Exec(execer, parameters...) if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) { - return fmt.Errorf("error deleting conntrack entries for %s port: %d, error: %v", protoStr(protocol), port, err) + return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err) } return nil } - -// IsClearConntrackNeeded returns true if protocol requires conntrack cleanup for the stale connections -func IsClearConntrackNeeded(proto v1.Protocol) bool { - return proto == v1.ProtocolUDP || proto == v1.ProtocolSCTP -} diff --git a/pkg/util/conntrack/conntrack_test.go b/pkg/util/conntrack/conntrack_test.go index d7a395cde9b..18a7aab4e7b 100644 --- a/pkg/util/conntrack/conntrack_test.go +++ b/pkg/util/conntrack/conntrack_test.go @@ -177,7 +177,7 @@ func TestClearUDPConntrackForPort(t *testing.T) { } } -func TestDeleteConnections(t *testing.T) { +func TestDeleteUDPConnections(t *testing.T) { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeAction{ func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, @@ -185,11 +185,6 @@ func TestDeleteConnections(t *testing.T) { return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") }, func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, - func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, - func() ([]byte, []byte, error) { - return []byte(""), nil, fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted") - }, - func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, }, } fexec := &fakeexec.FakeExec{ @@ -197,9 +192,6 @@ func TestDeleteConnections(t *testing.T) { func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } @@ -208,52 +200,30 @@ func TestDeleteConnections(t *testing.T) { name string origin string dest string - proto v1.Protocol }{ { - name: "UDP IPv4 success", + name: "IPv4 success", origin: "1.2.3.4", dest: "10.20.30.40", - proto: v1.ProtocolUDP, }, { - name: "UDP IPv4 simulated failure", + name: "IPv4 simulated failure", origin: "2.3.4.5", dest: "20.30.40.50", - proto: v1.ProtocolUDP, }, { - name: "UDP IPv6 success", + name: "IPv6 success", origin: "fd00::600d:f00d", dest: "2001:db8::5", - proto: v1.ProtocolUDP, - }, - { - name: "SCTP IPv4 success", - origin: "1.2.3.5", - dest: "10.20.30.50", - proto: v1.ProtocolSCTP, - }, - { - name: "SCTP IPv4 simulated failure", - origin: "2.3.4.6", - dest: "20.30.40.60", - proto: v1.ProtocolSCTP, - }, - { - name: "SCTP IPv6 success", - origin: "fd00::600d:f00d", - dest: "2001:db8::6", - proto: v1.ProtocolSCTP, }, } svcCount := 0 for i, tc := range testCases { - err := ClearEntriesForNAT(fexec, tc.origin, tc.dest, tc.proto) + err := ClearEntriesForNAT(fexec, tc.origin, tc.dest, v1.ProtocolUDP) if err != nil { t.Errorf("%s test case: unexpected error: %v", tc.name, err) } - expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p %s", tc.origin, tc.dest, protoStr(tc.proto)) + familyParamStr(utilnet.IsIPv6String(tc.origin)) + expectCommand := fmt.Sprintf("conntrack -D --orig-dst %s --dst-nat %s -p udp", tc.origin, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.origin)) execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") if expectCommand != execCommand { t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand) @@ -265,46 +235,36 @@ func TestDeleteConnections(t *testing.T) { } } -func TestClearConntrackForPortNAT(t *testing.T) { +func TestClearUDPConntrackForPortNAT(t *testing.T) { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeAction{ func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, - func() ([]byte, []byte, error) { return []byte("1 flow entries have been deleted"), nil, nil }, }, } fexec := &fakeexec.FakeExec{ CommandScript: []fakeexec.FakeCommandAction{ func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, - func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, }, LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } testCases := []struct { - name string - port int - dest string - proto v1.Protocol + name string + port int + dest string }{ { - name: "UDP IPv4 success", - port: 30211, - dest: "1.2.3.4", - proto: v1.ProtocolUDP, - }, - { - name: "SCTP IPv4 success", - port: 30215, - dest: "1.2.3.5", - proto: v1.ProtocolSCTP, + name: "IPv4 success", + port: 30211, + dest: "1.2.3.4", }, } svcCount := 0 for i, tc := range testCases { - err := ClearEntriesForPortNAT(fexec, tc.dest, tc.port, tc.proto) + err := ClearEntriesForPortNAT(fexec, tc.dest, tc.port, v1.ProtocolUDP) if err != nil { t.Errorf("%s test case: unexpected error: %v", tc.name, err) } - expectCommand := fmt.Sprintf("conntrack -D -p %s --dport %d --dst-nat %s", protoStr(tc.proto), tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest)) + expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d --dst-nat %s", tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest)) execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ") if expectCommand != execCommand { t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)