From c4f4e3bc43b7995954533a8997c26f53e1ed963a Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Mon, 17 Jul 2023 23:35:54 +0530 Subject: [PATCH 1/3] pkg/proxy: unit test refactoring Signed-off-by: Daman Arora --- pkg/proxy/endpoints_test.go | 116 ++++++++++++++++++------------------ 1 file changed, 58 insertions(+), 58 deletions(-) diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 37e6480f997..333ace85e52 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -495,13 +495,13 @@ func TestUpdateEndpointsMap(t *testing.T) { } testCases := []struct { - // previousEndpoints and currentEndpoints are used to call appropriate + // previousEndpointSlices and currentEndpointSlices are used to call appropriate // handlers OnEndpointSlice* (based on whether corresponding values are nil // or non-nil) and must be of equal length. name string - previousEndpoints []*discovery.EndpointSlice - currentEndpoints []*discovery.EndpointSlice - oldEndpoints map[ServicePortName][]*BaseEndpointInfo + previousEndpointSlices []*discovery.EndpointSlice + currentEndpointSlices []*discovery.EndpointSlice + previousEndpointsMap map[ServicePortName][]*BaseEndpointInfo expectedResult map[ServicePortName][]*BaseEndpointInfo expectedDeletedUDPEndpoints []ServiceEndpoint expectedNewlyActiveUDPServices map[ServicePortName]bool @@ -509,7 +509,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints sets.Set[string] }{{ name: "empty", - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{}, + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{}, expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, expectedDeletedUDPEndpoints: []ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, @@ -517,13 +517,13 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string](), }, { name: "no change, unnamed port", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, unnamedPort), }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, unnamedPort), }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, @@ -539,13 +539,13 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string](), }, { name: "no change, named port, local", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, namedPortLocal), }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, namedPortLocal), }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, @@ -563,15 +563,15 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string](), }, { name: "no change, multiple slices", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsets_s1), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsets_s2), }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsets_s1), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsets_s2), }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, @@ -593,15 +593,15 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string](), }, { name: "no change, multiple slices, multiple ports, local", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsMultiplePortsLocal_s1), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsMultiplePortsLocal_s2), }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsMultiplePortsLocal_s1), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsMultiplePortsLocal_s2), }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, @@ -631,17 +631,17 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string](), }, { name: "no change, multiple services, slices, IPs, and ports", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsIPsPorts1_s1), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsIPsPorts1_s2), makeTestEndpointSlice("ns2", "ep2", 1, multipleSubsetsIPsPorts2), }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsIPsPorts1_s1), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsIPsPorts1_s2), makeTestEndpointSlice("ns2", "ep2", 1, multipleSubsetsIPsPorts2), }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -702,13 +702,13 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string](), }, { name: "add an EndpointSlice", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ nil, }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortLocal), }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{}, + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{}, expectedResult: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -724,13 +724,13 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string]("ns1/ep1"), }, { name: "remove an EndpointSlice", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortLocal), }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ nil, }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, }, @@ -745,13 +745,13 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string]("ns1/ep1"), }, { name: "add an IP and port", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, namedPort), }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, namedPortsLocalNoLocal), }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, @@ -776,13 +776,13 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string]("ns1/ep1"), }, { name: "remove an IP and port", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, namedPortsLocalNoLocal), }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, namedPort), }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, @@ -812,15 +812,15 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string]("ns1/ep1"), }, { name: "add a slice to an endpoint", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, namedPort), nil, }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsWithLocal_s1), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsWithLocal_s2), }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, @@ -843,15 +843,15 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string]("ns1/ep1"), }, { name: "remove a slice from an endpoint", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsets_s1), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsets_s2), }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, namedPort), nil, }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, @@ -873,13 +873,13 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string]("ns1/ep1"), }, { name: "rename a port", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, namedPort), }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, namedPortRenamed), }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, @@ -900,13 +900,13 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string]("ns1/ep1"), }, { name: "renumber a port", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, namedPort), }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, namedPortRenumbered), }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, @@ -925,7 +925,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string]("ns1/ep1"), }, { name: "complex add and remove", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, complexBefore1), nil, @@ -938,7 +938,7 @@ func TestUpdateEndpointsMap(t *testing.T) { makeTestEndpointSlice("ns4", "ep4", 1, complexBefore4_s1), makeTestEndpointSlice("ns4", "ep4", 2, complexBefore4_s2), }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, complexAfter1_s1), makeTestEndpointSlice("ns1", "ep1", 2, complexAfter1_s2), @@ -951,7 +951,7 @@ func TestUpdateEndpointsMap(t *testing.T) { makeTestEndpointSlice("ns4", "ep4", 1, complexAfter4), nil, }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, }, @@ -1015,13 +1015,13 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedChangedEndpoints: sets.New[string]("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"), }, { name: "change from 0 endpoint address to 1 unnamed port", - previousEndpoints: []*discovery.EndpointSlice{ + previousEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, emptyEndpoint), }, - currentEndpoints: []*discovery.EndpointSlice{ + currentEndpointSlices: []*discovery.EndpointSlice{ makeTestEndpointSlice("ns1", "ep1", 1, unnamedPort), }, - oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{}, + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{}, expectedResult: map[ServicePortName][]*BaseEndpointInfo{ makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, @@ -1042,23 +1042,23 @@ func TestUpdateEndpointsMap(t *testing.T) { fp.hostname = nodeName // First check that after adding all previous versions of endpoints, - // the fp.oldEndpoints is as we expect. - for i := range tc.previousEndpoints { - if tc.previousEndpoints[i] != nil { - fp.addEndpointSlice(tc.previousEndpoints[i]) + // the fp.previousEndpointsMap is as we expect. + for i := range tc.previousEndpointSlices { + if tc.previousEndpointSlices[i] != nil { + fp.addEndpointSlice(tc.previousEndpointSlices[i]) } } fp.endpointsMap.Update(fp.endpointsChanges) - compareEndpointsMapsStr(t, fp.endpointsMap, tc.oldEndpoints) + compareEndpointsMapsStr(t, fp.endpointsMap, tc.previousEndpointsMap) // Now let's call appropriate handlers to get to state we want to be. - if len(tc.previousEndpoints) != len(tc.currentEndpoints) { + if len(tc.previousEndpointSlices) != len(tc.currentEndpointSlices) { t.Fatalf("[%d] different lengths of previous and current endpoints", tci) return } - for i := range tc.previousEndpoints { - prev, curr := tc.previousEndpoints[i], tc.currentEndpoints[i] + for i := range tc.previousEndpointSlices { + prev, curr := tc.previousEndpointSlices[i], tc.currentEndpointSlices[i] switch { case prev == nil && curr == nil: continue From 01df59a73b99ba1e491d35ba7527dc7683db102c Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Sun, 27 Aug 2023 18:16:44 +0530 Subject: [PATCH 2/3] pkg/proxy: remove equal method from endpoint interface Signed-off-by: Daman Arora --- pkg/proxy/endpoints.go | 11 +++-------- pkg/proxy/iptables/proxier.go | 13 ------------- pkg/proxy/types.go | 2 -- 3 files changed, 3 insertions(+), 23 deletions(-) diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index fd1e9485cdb..d1c04c78d85 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -117,13 +117,6 @@ func (info *BaseEndpointInfo) Port() (int, error) { return proxyutil.PortPart(info.Endpoint) } -// Equal is part of proxy.Endpoint interface. -func (info *BaseEndpointInfo) Equal(other Endpoint) bool { - return info.String() == other.String() && - info.GetIsLocal() == other.GetIsLocal() && - info.IsReady() == other.IsReady() -} - // GetNodeName returns the NodeName for this endpoint. func (info *BaseEndpointInfo) GetNodeName() string { return info.NodeName @@ -425,7 +418,9 @@ func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, // ready to not ready. If it did change stale entries for the old // endpoint have to be cleared. for i := range newEndpointsMap[svcPortName] { - if newEndpointsMap[svcPortName][i].Equal(ep) { + if newEndpointsMap[svcPortName][i].String() == ep.String() && + newEndpointsMap[svcPortName][i].IsReady() == ep.IsReady() && + newEndpointsMap[svcPortName][i].GetIsLocal() == ep.GetIsLocal() { deleted = false break } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index a6132a881d5..9f036d88a10 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -136,19 +136,6 @@ func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.Servic } } -// Equal overrides the Equal() function implemented by proxy.BaseEndpointInfo. -func (e *endpointsInfo) Equal(other proxy.Endpoint) bool { - o, ok := other.(*endpointsInfo) - if !ok { - klog.ErrorS(nil, "Failed to cast endpointsInfo") - return false - } - return e.Endpoint == o.Endpoint && - e.IsLocal == o.IsLocal && - e.ChainName == o.ChainName && - e.Ready == o.Ready -} - // Proxier is an iptables based proxy for connections between a localhost:lport // and services that provide the actual backends. type Proxier struct { diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index 7c9d19ab818..56dc3369407 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -131,8 +131,6 @@ type Endpoint interface { IP() string // Port returns the Port part of the endpoint. Port() (int, error) - // Equal checks if two endpoints are equal. - Equal(Endpoint) bool // GetNodeName returns the node name for the endpoint GetNodeName() string // GetZone returns the zone for the endpoint From 2e5f17166b97e5511774d2c35a6d3f03e18181ea Mon Sep 17 00:00:00 2001 From: Daman Arora Date: Sun, 27 Aug 2023 19:05:13 +0530 Subject: [PATCH 3/3] pkg/proxy: fix stale detection logic Signed-off-by: Daman Arora --- pkg/proxy/endpoints.go | 24 +++++----- pkg/proxy/endpoints_test.go | 73 ++++++++++++++++++++++++++++++ pkg/proxy/iptables/proxier_test.go | 4 +- 3 files changed, 86 insertions(+), 15 deletions(-) diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index d1c04c78d85..47976dbf094 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -407,20 +407,18 @@ func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, } for _, ep := range epList { - // If the old endpoint wasn't Ready then there can't be stale + // If the old endpoint wasn't Serving then there can't be stale // conntrack entries since there was no traffic sent to it. - if !ep.IsReady() { + if !ep.IsServing() { continue } deleted := true // Check if the endpoint has changed, including if it went from - // ready to not ready. If it did change stale entries for the old + // serving to not serving. If it did change stale entries for the old // endpoint have to be cleared. for i := range newEndpointsMap[svcPortName] { - if newEndpointsMap[svcPortName][i].String() == ep.String() && - newEndpointsMap[svcPortName][i].IsReady() == ep.IsReady() && - newEndpointsMap[svcPortName][i].GetIsLocal() == ep.GetIsLocal() { + if newEndpointsMap[svcPortName][i].String() == ep.String() { deleted = false break } @@ -441,21 +439,21 @@ func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, continue } - epReady := 0 + epServing := 0 for _, ep := range epList { - if ep.IsReady() { - epReady++ + if ep.IsServing() { + epServing++ } } - oldEpReady := 0 + oldEpServing := 0 for _, ep := range oldEndpointsMap[svcPortName] { - if ep.IsReady() { - oldEpReady++ + if ep.IsServing() { + oldEpServing++ } } - if epReady > 0 && oldEpReady == 0 { + if epServing > 0 && oldEpServing == 0 { *newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName) } } diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 333ace85e52..824a5a91461 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -200,6 +200,36 @@ func TestUpdateEndpointsMap(t *testing.T) { Protocol: &udp, }} } + unnamedPortReady := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), + }, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String(""), + Port: pointer.Int32(11), + Protocol: &udp, + }} + } + unnamedPortTerminating := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: pointer.Bool(false), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(true), + }, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String(""), + Port: pointer.Int32(11), + Protocol: &udp, + }} + } unnamedPortLocal := func(eps *discovery.EndpointSlice) { eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.1"}, @@ -1033,6 +1063,49 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + }, { + name: "change from ready to terminating pod", + previousEndpointSlices: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortReady), + }, + currentEndpointSlices: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortTerminating), + }, + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + }, + }, + expectedResult: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: false, Serving: true, Terminating: true}, + }, + }, + expectedDeletedUDPEndpoints: []ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + }, { + name: "change from terminating to empty pod", + previousEndpointSlices: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortTerminating), + }, + currentEndpointSlices: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, emptyEndpoint), + }, + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: false, Serving: true, Terminating: true}, + }, + }, + expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, + expectedDeletedUDPEndpoints: []ServiceEndpoint{{ + Endpoint: "1.1.1.1:11", + ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP), + }}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New[string]("ns1/ep1"), }, } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 831fb1b6a30..bc3c55d37a9 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -5268,7 +5268,7 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) { eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP}, Conditions: discovery.EndpointConditions{ - Ready: pointer.Bool(false), + Serving: pointer.Bool(false), }, }} eps.Ports = []discovery.EndpointPort{{ @@ -5291,7 +5291,7 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) { eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP}, Conditions: discovery.EndpointConditions{ - Ready: pointer.Bool(true), + Serving: pointer.Bool(true), }, }} eps.Ports = []discovery.EndpointPort{{