pkg/proxy: fix stale detection logic

Signed-off-by: Daman Arora <aroradaman@gmail.com>
This commit is contained in:
Daman Arora 2023-08-27 19:05:13 +05:30
parent 01df59a73b
commit 2e5f17166b
3 changed files with 86 additions and 15 deletions

View File

@ -407,20 +407,18 @@ func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap,
} }
for _, ep := range epList { for _, ep := range epList {
// If the old endpoint wasn't Ready then there can't be stale // If the old endpoint wasn't Serving then there can't be stale
// conntrack entries since there was no traffic sent to it. // conntrack entries since there was no traffic sent to it.
if !ep.IsReady() { if !ep.IsServing() {
continue continue
} }
deleted := true deleted := true
// Check if the endpoint has changed, including if it went from // Check if the endpoint has changed, including if it went from
// ready to not ready. If it did change stale entries for the old // serving to not serving. If it did change stale entries for the old
// endpoint have to be cleared. // endpoint have to be cleared.
for i := range newEndpointsMap[svcPortName] { for i := range newEndpointsMap[svcPortName] {
if newEndpointsMap[svcPortName][i].String() == ep.String() && if newEndpointsMap[svcPortName][i].String() == ep.String() {
newEndpointsMap[svcPortName][i].IsReady() == ep.IsReady() &&
newEndpointsMap[svcPortName][i].GetIsLocal() == ep.GetIsLocal() {
deleted = false deleted = false
break break
} }
@ -441,21 +439,21 @@ func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap,
continue continue
} }
epReady := 0 epServing := 0
for _, ep := range epList { for _, ep := range epList {
if ep.IsReady() { if ep.IsServing() {
epReady++ epServing++
} }
} }
oldEpReady := 0 oldEpServing := 0
for _, ep := range oldEndpointsMap[svcPortName] { for _, ep := range oldEndpointsMap[svcPortName] {
if ep.IsReady() { if ep.IsServing() {
oldEpReady++ oldEpServing++
} }
} }
if epReady > 0 && oldEpReady == 0 { if epServing > 0 && oldEpServing == 0 {
*newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName) *newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName)
} }
} }

View File

@ -200,6 +200,36 @@ func TestUpdateEndpointsMap(t *testing.T) {
Protocol: &udp, Protocol: &udp,
}} }}
} }
unnamedPortReady := func(eps *discovery.EndpointSlice) {
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"1.1.1.1"},
Conditions: discovery.EndpointConditions{
Ready: pointer.Bool(true),
Serving: pointer.Bool(true),
Terminating: pointer.Bool(false),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.String(""),
Port: pointer.Int32(11),
Protocol: &udp,
}}
}
unnamedPortTerminating := func(eps *discovery.EndpointSlice) {
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"1.1.1.1"},
Conditions: discovery.EndpointConditions{
Ready: pointer.Bool(false),
Serving: pointer.Bool(true),
Terminating: pointer.Bool(true),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.String(""),
Port: pointer.Int32(11),
Protocol: &udp,
}}
}
unnamedPortLocal := func(eps *discovery.EndpointSlice) { unnamedPortLocal := func(eps *discovery.EndpointSlice) {
eps.Endpoints = []discovery.Endpoint{{ eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"1.1.1.1"}, Addresses: []string{"1.1.1.1"},
@ -1033,6 +1063,49 @@ func TestUpdateEndpointsMap(t *testing.T) {
}, },
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"), expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, {
name: "change from ready to terminating pod",
previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortReady),
},
currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortTerminating),
},
previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
},
},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: false, Serving: true, Terminating: true},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, {
name: "change from terminating to empty pod",
previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortTerminating),
},
currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, emptyEndpoint),
},
previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: false, Serving: true, Terminating: true},
},
},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, },
} }

View File

@ -5268,7 +5268,7 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
eps.Endpoints = []discovery.Endpoint{{ eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP}, Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{ Conditions: discovery.EndpointConditions{
Ready: pointer.Bool(false), Serving: pointer.Bool(false),
}, },
}} }}
eps.Ports = []discovery.EndpointPort{{ eps.Ports = []discovery.EndpointPort{{
@ -5291,7 +5291,7 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
eps.Endpoints = []discovery.Endpoint{{ eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP}, Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{ Conditions: discovery.EndpointConditions{
Ready: pointer.Bool(true), Serving: pointer.Bool(true),
}, },
}} }}
eps.Ports = []discovery.EndpointPort{{ eps.Ports = []discovery.EndpointPort{{