Merge pull request #119394 from aroradaman/fix/proxy-conntrack

Fix stale conntrack flow detection logic
This commit is contained in:
Kubernetes Prow Robot 2023-09-03 14:53:46 -07:00 committed by GitHub
commit d4050a80c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 144 additions and 93 deletions

View File

@ -117,13 +117,6 @@ func (info *BaseEndpointInfo) Port() (int, error) {
return proxyutil.PortPart(info.Endpoint) return proxyutil.PortPart(info.Endpoint)
} }
// Equal is part of proxy.Endpoint interface.
func (info *BaseEndpointInfo) Equal(other Endpoint) bool {
return info.String() == other.String() &&
info.GetIsLocal() == other.GetIsLocal() &&
info.IsReady() == other.IsReady()
}
// GetNodeName returns the NodeName for this endpoint. // GetNodeName returns the NodeName for this endpoint.
func (info *BaseEndpointInfo) GetNodeName() string { func (info *BaseEndpointInfo) GetNodeName() string {
return info.NodeName return info.NodeName
@ -414,18 +407,18 @@ func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap,
} }
for _, ep := range epList { for _, ep := range epList {
// If the old endpoint wasn't Ready then there can't be stale // If the old endpoint wasn't Serving then there can't be stale
// conntrack entries since there was no traffic sent to it. // conntrack entries since there was no traffic sent to it.
if !ep.IsReady() { if !ep.IsServing() {
continue continue
} }
deleted := true deleted := true
// Check if the endpoint has changed, including if it went from // Check if the endpoint has changed, including if it went from
// ready to not ready. If it did change stale entries for the old // serving to not serving. If it did change stale entries for the old
// endpoint have to be cleared. // endpoint have to be cleared.
for i := range newEndpointsMap[svcPortName] { for i := range newEndpointsMap[svcPortName] {
if newEndpointsMap[svcPortName][i].Equal(ep) { if newEndpointsMap[svcPortName][i].String() == ep.String() {
deleted = false deleted = false
break break
} }
@ -446,21 +439,21 @@ func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap,
continue continue
} }
epReady := 0 epServing := 0
for _, ep := range epList { for _, ep := range epList {
if ep.IsReady() { if ep.IsServing() {
epReady++ epServing++
} }
} }
oldEpReady := 0 oldEpServing := 0
for _, ep := range oldEndpointsMap[svcPortName] { for _, ep := range oldEndpointsMap[svcPortName] {
if ep.IsReady() { if ep.IsServing() {
oldEpReady++ oldEpServing++
} }
} }
if epReady > 0 && oldEpReady == 0 { if epServing > 0 && oldEpServing == 0 {
*newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName) *newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName)
} }
} }

View File

@ -200,6 +200,36 @@ func TestUpdateEndpointsMap(t *testing.T) {
Protocol: &udp, Protocol: &udp,
}} }}
} }
unnamedPortReady := func(eps *discovery.EndpointSlice) {
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"1.1.1.1"},
Conditions: discovery.EndpointConditions{
Ready: pointer.Bool(true),
Serving: pointer.Bool(true),
Terminating: pointer.Bool(false),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.String(""),
Port: pointer.Int32(11),
Protocol: &udp,
}}
}
unnamedPortTerminating := func(eps *discovery.EndpointSlice) {
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"1.1.1.1"},
Conditions: discovery.EndpointConditions{
Ready: pointer.Bool(false),
Serving: pointer.Bool(true),
Terminating: pointer.Bool(true),
},
}}
eps.Ports = []discovery.EndpointPort{{
Name: pointer.String(""),
Port: pointer.Int32(11),
Protocol: &udp,
}}
}
unnamedPortLocal := func(eps *discovery.EndpointSlice) { unnamedPortLocal := func(eps *discovery.EndpointSlice) {
eps.Endpoints = []discovery.Endpoint{{ eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"1.1.1.1"}, Addresses: []string{"1.1.1.1"},
@ -495,13 +525,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
} }
testCases := []struct { testCases := []struct {
// previousEndpoints and currentEndpoints are used to call appropriate // previousEndpointSlices and currentEndpointSlices are used to call appropriate
// handlers OnEndpointSlice* (based on whether corresponding values are nil // handlers OnEndpointSlice* (based on whether corresponding values are nil
// or non-nil) and must be of equal length. // or non-nil) and must be of equal length.
name string name string
previousEndpoints []*discovery.EndpointSlice previousEndpointSlices []*discovery.EndpointSlice
currentEndpoints []*discovery.EndpointSlice currentEndpointSlices []*discovery.EndpointSlice
oldEndpoints map[ServicePortName][]*BaseEndpointInfo previousEndpointsMap map[ServicePortName][]*BaseEndpointInfo
expectedResult map[ServicePortName][]*BaseEndpointInfo expectedResult map[ServicePortName][]*BaseEndpointInfo
expectedDeletedUDPEndpoints []ServiceEndpoint expectedDeletedUDPEndpoints []ServiceEndpoint
expectedNewlyActiveUDPServices map[ServicePortName]bool expectedNewlyActiveUDPServices map[ServicePortName]bool
@ -509,7 +539,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints sets.Set[string] expectedChangedEndpoints sets.Set[string]
}{{ }{{
name: "empty", name: "empty",
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{}, previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{}, expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
expectedDeletedUDPEndpoints: []ServiceEndpoint{}, expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
@ -517,13 +547,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string](), expectedChangedEndpoints: sets.New[string](),
}, { }, {
name: "no change, unnamed port", name: "no change, unnamed port",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, unnamedPort), makeTestEndpointSlice("ns1", "ep1", 1, unnamedPort),
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, unnamedPort), makeTestEndpointSlice("ns1", "ep1", 1, unnamedPort),
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
@ -539,13 +569,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string](), expectedChangedEndpoints: sets.New[string](),
}, { }, {
name: "no change, named port, local", name: "no change, named port, local",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, namedPortLocal), makeTestEndpointSlice("ns1", "ep1", 1, namedPortLocal),
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, namedPortLocal), makeTestEndpointSlice("ns1", "ep1", 1, namedPortLocal),
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
@ -563,15 +593,15 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string](), expectedChangedEndpoints: sets.New[string](),
}, { }, {
name: "no change, multiple slices", name: "no change, multiple slices",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsets_s1), makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsets_s1),
makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsets_s2), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsets_s2),
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsets_s1), makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsets_s1),
makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsets_s2), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsets_s2),
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
@ -593,15 +623,15 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string](), expectedChangedEndpoints: sets.New[string](),
}, { }, {
name: "no change, multiple slices, multiple ports, local", name: "no change, multiple slices, multiple ports, local",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsMultiplePortsLocal_s1), makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsMultiplePortsLocal_s1),
makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsMultiplePortsLocal_s2), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsMultiplePortsLocal_s2),
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsMultiplePortsLocal_s1), makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsMultiplePortsLocal_s1),
makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsMultiplePortsLocal_s2), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsMultiplePortsLocal_s2),
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
@ -631,17 +661,17 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string](), expectedChangedEndpoints: sets.New[string](),
}, { }, {
name: "no change, multiple services, slices, IPs, and ports", name: "no change, multiple services, slices, IPs, and ports",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsIPsPorts1_s1), makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsIPsPorts1_s1),
makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsIPsPorts1_s2), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsIPsPorts1_s2),
makeTestEndpointSlice("ns2", "ep2", 1, multipleSubsetsIPsPorts2), makeTestEndpointSlice("ns2", "ep2", 1, multipleSubsetsIPsPorts2),
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsIPsPorts1_s1), makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsIPsPorts1_s1),
makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsIPsPorts1_s2), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsIPsPorts1_s2),
makeTestEndpointSlice("ns2", "ep2", 1, multipleSubsetsIPsPorts2), makeTestEndpointSlice("ns2", "ep2", 1, multipleSubsetsIPsPorts2),
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false},
@ -702,13 +732,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string](), expectedChangedEndpoints: sets.New[string](),
}, { }, {
name: "add an EndpointSlice", name: "add an EndpointSlice",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
nil, nil,
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortLocal), makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortLocal),
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{}, previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{ expectedResult: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false},
@ -724,13 +754,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string]("ns1/ep1"), expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, { }, {
name: "remove an EndpointSlice", name: "remove an EndpointSlice",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortLocal), makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortLocal),
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
nil, nil,
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: true, Ready: true, Serving: true, Terminating: false},
}, },
@ -745,13 +775,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string]("ns1/ep1"), expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, { }, {
name: "add an IP and port", name: "add an IP and port",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, namedPort), makeTestEndpointSlice("ns1", "ep1", 1, namedPort),
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, namedPortsLocalNoLocal), makeTestEndpointSlice("ns1", "ep1", 1, namedPortsLocalNoLocal),
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
@ -776,13 +806,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string]("ns1/ep1"), expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, { }, {
name: "remove an IP and port", name: "remove an IP and port",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, namedPortsLocalNoLocal), makeTestEndpointSlice("ns1", "ep1", 1, namedPortsLocalNoLocal),
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, namedPort), makeTestEndpointSlice("ns1", "ep1", 1, namedPort),
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
{Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.2:11", IsLocal: true, Ready: true, Serving: true, Terminating: false},
@ -812,15 +842,15 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string]("ns1/ep1"), expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, { }, {
name: "add a slice to an endpoint", name: "add a slice to an endpoint",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, namedPort), makeTestEndpointSlice("ns1", "ep1", 1, namedPort),
nil, nil,
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsWithLocal_s1), makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsetsWithLocal_s1),
makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsWithLocal_s2), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsetsWithLocal_s2),
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
@ -843,15 +873,15 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string]("ns1/ep1"), expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, { }, {
name: "remove a slice from an endpoint", name: "remove a slice from an endpoint",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsets_s1), makeTestEndpointSlice("ns1", "ep1", 1, multipleSubsets_s1),
makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsets_s2), makeTestEndpointSlice("ns1", "ep1", 2, multipleSubsets_s2),
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, namedPort), makeTestEndpointSlice("ns1", "ep1", 1, namedPort),
nil, nil,
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
@ -873,13 +903,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string]("ns1/ep1"), expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, { }, {
name: "rename a port", name: "rename a port",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, namedPort), makeTestEndpointSlice("ns1", "ep1", 1, namedPort),
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, namedPortRenamed), makeTestEndpointSlice("ns1", "ep1", 1, namedPortRenamed),
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
@ -900,13 +930,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string]("ns1/ep1"), expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, { }, {
name: "renumber a port", name: "renumber a port",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, namedPort), makeTestEndpointSlice("ns1", "ep1", 1, namedPort),
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, namedPortRenumbered), makeTestEndpointSlice("ns1", "ep1", 1, namedPortRenumbered),
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
@ -925,7 +955,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string]("ns1/ep1"), expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, { }, {
name: "complex add and remove", name: "complex add and remove",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, complexBefore1), makeTestEndpointSlice("ns1", "ep1", 1, complexBefore1),
nil, nil,
@ -938,7 +968,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
makeTestEndpointSlice("ns4", "ep4", 1, complexBefore4_s1), makeTestEndpointSlice("ns4", "ep4", 1, complexBefore4_s1),
makeTestEndpointSlice("ns4", "ep4", 2, complexBefore4_s2), makeTestEndpointSlice("ns4", "ep4", 2, complexBefore4_s2),
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, complexAfter1_s1), makeTestEndpointSlice("ns1", "ep1", 1, complexAfter1_s1),
makeTestEndpointSlice("ns1", "ep1", 2, complexAfter1_s2), makeTestEndpointSlice("ns1", "ep1", 2, complexAfter1_s2),
@ -951,7 +981,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
makeTestEndpointSlice("ns4", "ep4", 1, complexAfter4), makeTestEndpointSlice("ns4", "ep4", 1, complexAfter4),
nil, nil,
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{ previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
}, },
@ -1015,13 +1045,13 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedChangedEndpoints: sets.New[string]("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"), expectedChangedEndpoints: sets.New[string]("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"),
}, { }, {
name: "change from 0 endpoint address to 1 unnamed port", name: "change from 0 endpoint address to 1 unnamed port",
previousEndpoints: []*discovery.EndpointSlice{ previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, emptyEndpoint), makeTestEndpointSlice("ns1", "ep1", 1, emptyEndpoint),
}, },
currentEndpoints: []*discovery.EndpointSlice{ currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, unnamedPort), makeTestEndpointSlice("ns1", "ep1", 1, unnamedPort),
}, },
oldEndpoints: map[ServicePortName][]*BaseEndpointInfo{}, previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{ expectedResult: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): { makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false}, {Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
@ -1033,6 +1063,49 @@ func TestUpdateEndpointsMap(t *testing.T) {
}, },
expectedLocalEndpoints: map[types.NamespacedName]int{}, expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"), expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, {
name: "change from ready to terminating pod",
previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortReady),
},
currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortTerminating),
},
previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: true, Serving: true, Terminating: false},
},
},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: false, Serving: true, Terminating: true},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, {
name: "change from terminating to empty pod",
previousEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, unnamedPortTerminating),
},
currentEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, emptyEndpoint),
},
previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): {
{Endpoint: "1.1.1.1:11", IsLocal: false, Ready: false, Serving: true, Terminating: true},
},
},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
}, },
} }
@ -1042,23 +1115,23 @@ func TestUpdateEndpointsMap(t *testing.T) {
fp.hostname = nodeName fp.hostname = nodeName
// First check that after adding all previous versions of endpoints, // First check that after adding all previous versions of endpoints,
// the fp.oldEndpoints is as we expect. // the fp.previousEndpointsMap is as we expect.
for i := range tc.previousEndpoints { for i := range tc.previousEndpointSlices {
if tc.previousEndpoints[i] != nil { if tc.previousEndpointSlices[i] != nil {
fp.addEndpointSlice(tc.previousEndpoints[i]) fp.addEndpointSlice(tc.previousEndpointSlices[i])
} }
} }
fp.endpointsMap.Update(fp.endpointsChanges) fp.endpointsMap.Update(fp.endpointsChanges)
compareEndpointsMapsStr(t, fp.endpointsMap, tc.oldEndpoints) compareEndpointsMapsStr(t, fp.endpointsMap, tc.previousEndpointsMap)
// Now let's call appropriate handlers to get to state we want to be. // Now let's call appropriate handlers to get to state we want to be.
if len(tc.previousEndpoints) != len(tc.currentEndpoints) { if len(tc.previousEndpointSlices) != len(tc.currentEndpointSlices) {
t.Fatalf("[%d] different lengths of previous and current endpoints", tci) t.Fatalf("[%d] different lengths of previous and current endpoints", tci)
return return
} }
for i := range tc.previousEndpoints { for i := range tc.previousEndpointSlices {
prev, curr := tc.previousEndpoints[i], tc.currentEndpoints[i] prev, curr := tc.previousEndpointSlices[i], tc.currentEndpointSlices[i]
switch { switch {
case prev == nil && curr == nil: case prev == nil && curr == nil:
continue continue

View File

@ -136,19 +136,6 @@ func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.Servic
} }
} }
// Equal overrides the Equal() function implemented by proxy.BaseEndpointInfo.
func (e *endpointsInfo) Equal(other proxy.Endpoint) bool {
o, ok := other.(*endpointsInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast endpointsInfo")
return false
}
return e.Endpoint == o.Endpoint &&
e.IsLocal == o.IsLocal &&
e.ChainName == o.ChainName &&
e.Ready == o.Ready
}
// Proxier is an iptables based proxy for connections between a localhost:lport // Proxier is an iptables based proxy for connections between a localhost:lport
// and services that provide the actual backends. // and services that provide the actual backends.
type Proxier struct { type Proxier struct {

View File

@ -5271,7 +5271,7 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
eps.Endpoints = []discovery.Endpoint{{ eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP}, Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{ Conditions: discovery.EndpointConditions{
Ready: pointer.Bool(false), Serving: pointer.Bool(false),
}, },
}} }}
eps.Ports = []discovery.EndpointPort{{ eps.Ports = []discovery.EndpointPort{{
@ -5294,7 +5294,7 @@ func TestProxierDeleteNodePortStaleUDP(t *testing.T) {
eps.Endpoints = []discovery.Endpoint{{ eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{epIP}, Addresses: []string{epIP},
Conditions: discovery.EndpointConditions{ Conditions: discovery.EndpointConditions{
Ready: pointer.Bool(true), Serving: pointer.Bool(true),
}, },
}} }}
eps.Ports = []discovery.EndpointPort{{ eps.Ports = []discovery.EndpointPort{{

View File

@ -131,8 +131,6 @@ type Endpoint interface {
IP() string IP() string
// Port returns the Port part of the endpoint. // Port returns the Port part of the endpoint.
Port() (int, error) Port() (int, error)
// Equal checks if two endpoints are equal.
Equal(Endpoint) bool
// GetNodeName returns the node name for the endpoint // GetNodeName returns the node name for the endpoint
GetNodeName() string GetNodeName() string
// GetZone returns the zone for the endpoint // GetZone returns the zone for the endpoint