diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index d1c04c78d85..47976dbf094 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -407,20 +407,18 @@ func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, } for _, ep := range epList { - // If the old endpoint wasn't Ready then there can't be stale + // If the old endpoint wasn't Serving then there can't be stale // conntrack entries since there was no traffic sent to it. - if !ep.IsReady() { + if !ep.IsServing() { continue } deleted := true // Check if the endpoint has changed, including if it went from - // ready to not ready. If it did change stale entries for the old + // serving to not serving. If it did change stale entries for the old // endpoint have to be cleared. for i := range newEndpointsMap[svcPortName] { - if newEndpointsMap[svcPortName][i].String() == ep.String() && - newEndpointsMap[svcPortName][i].IsReady() == ep.IsReady() && - newEndpointsMap[svcPortName][i].GetIsLocal() == ep.GetIsLocal() { + if newEndpointsMap[svcPortName][i].String() == ep.String() { deleted = false break } @@ -441,21 +439,21 @@ func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, continue } - epReady := 0 + epServing := 0 for _, ep := range epList { - if ep.IsReady() { - epReady++ + if ep.IsServing() { + epServing++ } } - oldEpReady := 0 + oldEpServing := 0 for _, ep := range oldEndpointsMap[svcPortName] { - if ep.IsReady() { - oldEpReady++ + if ep.IsServing() { + oldEpServing++ } } - if epReady > 0 && oldEpReady == 0 { + if epServing > 0 && oldEpServing == 0 { *newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName) } } diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 333ace85e52..824a5a91461 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -200,6 +200,36 @@ func TestUpdateEndpointsMap(t *testing.T) { Protocol: &udp, }} } + unnamedPortReady := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: pointer.Bool(true), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(false), + }, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String(""), + Port: pointer.Int32(11), + Protocol: &udp, + }} + } + unnamedPortTerminating := func(eps *discovery.EndpointSlice) { + eps.Endpoints = []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + Conditions: discovery.EndpointConditions{ + Ready: pointer.Bool(false), + Serving: pointer.Bool(true), + Terminating: pointer.Bool(true), + }, + }} + eps.Ports = []discovery.EndpointPort{{ + Name: pointer.String(""), + Port: pointer.Int32(11), + Protocol: &udp, + }} + } unnamedPortLocal := func(eps *discovery.EndpointSlice) { eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{"1.1.1.1"}, @@ -1033,6 +1063,49 @@ func TestUpdateEndpointsMap(t *testing.T) { }, expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + }, { + name: "change from ready to terminating pod", + previousEndpointSlices: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortReady), + }, + currentEndpointSlices: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortTerminating), + }, + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + }, + }, + expectedResult: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: false, Serving: true, Terminating: true}, + }, + }, + expectedDeletedUDPEndpoints: []ServiceEndpoint{}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + }, { + name: "change from terminating to empty pod", + previousEndpointSlices: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortTerminating), + }, + currentEndpointSlices: []*discovery.EndpointSlice{ + makeTestEndpointSlice("ns1", "ep1", 1, emptyEndpoint), + }, + previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { + {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: false, Serving: true, Terminating: true}, + }, + }, + expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, + expectedDeletedUDPEndpoints: []ServiceEndpoint{{ + Endpoint: "1.1.1.1:11", + ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP), + }}, + expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, + expectedLocalEndpoints: map[types.NamespacedName]int{}, + expectedChangedEndpoints: sets.New[string]("ns1/ep1"), }, } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 831fb1b6a30..bc3c55d37a9 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -5268,7 +5268,7 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) { eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP}, Conditions: discovery.EndpointConditions{ - Ready: pointer.Bool(false), + Serving: pointer.Bool(false), }, }} eps.Ports = []discovery.EndpointPort{{ @@ -5291,7 +5291,7 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) { eps.Endpoints = []discovery.Endpoint{{ Addresses: []string{epIP}, Conditions: discovery.EndpointConditions{ - Ready: pointer.Bool(true), + Serving: pointer.Bool(true), }, }} eps.Ports = []discovery.EndpointPort{{