proxy: refactor UpdateEndpointsMapResult

Signed-off-by: Daman Arora <aroradaman@gmail.com>
This commit is contained in:
Daman Arora 2024-09-18 22:52:18 +05:30
parent 1ad8880c0f
commit c398af07fa
9 changed files with 199 additions and 642 deletions

View File

@ -178,18 +178,9 @@ type UpdateEndpointsMapResult struct {
// UpdatedServices lists the names of all services with added/updated/deleted
// endpoints since the last Update.
UpdatedServices sets.Set[types.NamespacedName]
// DeletedUDPEndpoints identifies UDP endpoints that have just been deleted.
// Existing conntrack NAT entries pointing to these endpoints must be deleted to
// ensure that no further traffic for the Service gets delivered to them.
DeletedUDPEndpoints []ServiceEndpoint
// NewlyActiveUDPServices identifies UDP Services that have just gone from 0 to
// non-0 endpoints. Existing conntrack entries caching the fact that these
// services are black holes must be deleted to ensure that traffic can immediately
// begin flowing to the new endpoints.
NewlyActiveUDPServices []ServicePortName
// ConntrackCleanupRequired will be true if any UDP ServicePort changed endpoints, false otherwise.
// It's used to minimise conntrack cleanup calls.
ConntrackCleanupRequired bool
// List of the trigger times for all endpoints objects that changed. It's used to export the
// network programming latency.
// NOTE(oxddr): this can be simplified to []time.Time if memory consumption becomes an issue.
@ -205,8 +196,6 @@ type EndpointsMap map[ServicePortName][]Endpoint
func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapResult {
result := UpdateEndpointsMapResult{
UpdatedServices: sets.New[types.NamespacedName](),
DeletedUDPEndpoints: make([]ServiceEndpoint, 0),
NewlyActiveUDPServices: make([]ServicePortName, 0),
LastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
}
if ect == nil {
@ -222,7 +211,26 @@ func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapRes
em.unmerge(change.previous)
em.merge(change.current)
detectStaleConntrackEntries(change.previous, change.current, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices)
// result.ConntrackCleanupRequired should be true if any one of the UDP
// ServicePort changed endpoint. Once true, we don't update the value.
if result.ConntrackCleanupRequired {
continue
}
// Check if the changed service had any UDP ServicePort
for svcPort := range change.previous {
if svcPort.NamespacedName == nn && svcPort.Protocol == v1.ProtocolUDP {
result.ConntrackCleanupRequired = true
break
}
}
// Check if the changed service has any UDP ServicePort
for svcPort := range change.current {
if svcPort.NamespacedName == nn && svcPort.Protocol == v1.ProtocolUDP {
result.ConntrackCleanupRequired = true
break
}
}
}
ect.checkoutTriggerTimes(&result.LastChangeTriggerTimes)
@ -284,68 +292,3 @@ func (em EndpointsMap) LocalReadyEndpoints() map[types.NamespacedName]int {
}
return eps
}
// detectStaleConntrackEntries detects services that may be associated with stale conntrack entries.
// (See UpdateEndpointsMapResult.DeletedUDPEndpoints and .NewlyActiveUDPServices.)
func detectStaleConntrackEntries(oldEndpointsMap, newEndpointsMap EndpointsMap, deletedUDPEndpoints *[]ServiceEndpoint, newlyActiveUDPServices *[]ServicePortName) {
// Find the UDP endpoints that we were sending traffic to in oldEndpointsMap, but
// are no longer sending to newEndpointsMap. The proxier should make sure that
// conntrack does not accidentally route any new connections to them.
for svcPortName, epList := range oldEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP {
continue
}
for _, ep := range epList {
// If the old endpoint wasn't Serving then there can't be stale
// conntrack entries since there was no traffic sent to it.
if !ep.IsServing() {
continue
}
deleted := true
// Check if the endpoint has changed, including if it went from
// serving to not serving. If it did change stale entries for the old
// endpoint have to be cleared.
for i := range newEndpointsMap[svcPortName] {
if newEndpointsMap[svcPortName][i].String() == ep.String() &&
newEndpointsMap[svcPortName][i].IsServing() == ep.IsServing() {
deleted = false
break
}
}
if deleted {
klog.V(4).InfoS("Deleted endpoint may have stale conntrack entries", "portName", svcPortName, "endpoint", ep)
*deletedUDPEndpoints = append(*deletedUDPEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName})
}
}
}
// Detect services that have gone from 0 to non-0 ready endpoints. If there were
// previously 0 endpoints, but someone tried to connect to it, then a conntrack
// entry may have been created blackholing traffic to that IP, which should be
// deleted now.
for svcPortName, epList := range newEndpointsMap {
if svcPortName.Protocol != v1.ProtocolUDP {
continue
}
epServing := 0
for _, ep := range epList {
if ep.IsServing() {
epServing++
}
}
oldEpServing := 0
for _, ep := range oldEndpointsMap[svcPortName] {
if ep.IsServing() {
oldEpServing++
}
}
if epServing > 0 && oldEpServing == 0 {
*newlyActiveUDPServices = append(*newlyActiveUDPServices, svcPortName)
}
}
}

View File

@ -525,23 +525,21 @@ func TestUpdateEndpointsMap(t *testing.T) {
// previousEndpointSlices and currentEndpointSlices are used to call appropriate
// handlers OnEndpointSlice* (based on whether corresponding values are nil
// or non-nil) and must be of equal length.
name string
previousEndpointSlices []*discovery.EndpointSlice
currentEndpointSlices []*discovery.EndpointSlice
previousEndpointsMap map[ServicePortName][]*BaseEndpointInfo
expectedResult map[ServicePortName][]*BaseEndpointInfo
expectedDeletedUDPEndpoints []ServiceEndpoint
expectedNewlyActiveUDPServices map[ServicePortName]bool
expectedLocalEndpoints map[types.NamespacedName]int
expectedChangedEndpoints sets.Set[types.NamespacedName]
name string
previousEndpointSlices []*discovery.EndpointSlice
currentEndpointSlices []*discovery.EndpointSlice
previousEndpointsMap map[ServicePortName][]*BaseEndpointInfo
expectedResult map[ServicePortName][]*BaseEndpointInfo
expectedConntrackCleanupRequired bool
expectedLocalEndpoints map[types.NamespacedName]int
expectedChangedEndpoints sets.Set[types.NamespacedName]
}{{
name: "empty",
previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[types.NamespacedName](),
name: "empty",
previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[types.NamespacedName](),
}, {
name: "no change, unnamed port",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -560,10 +558,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[types.NamespacedName](),
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[types.NamespacedName](),
}, {
name: "no change, named port, local",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -582,8 +579,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: true, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -614,10 +610,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.2", port: 12, endpoint: "1.1.1.2:12", isLocal: false, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[types.NamespacedName](),
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[types.NamespacedName](),
}, {
name: "no change, multiple slices, multiple ports, local",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -650,8 +645,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.3", port: 13, endpoint: "1.1.1.3:13", isLocal: false, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -720,8 +714,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "2.2.2.2", port: 22, endpoint: "2.2.2.2:22", isLocal: true, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 2,
makeNSN("ns2", "ep2"): 1,
@ -741,10 +734,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: true, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -762,14 +752,10 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: true, ready: true, serving: true, terminating: false},
},
},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
expectedConntrackCleanupRequired: true,
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "add an IP and port",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -793,10 +779,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.2", port: 12, endpoint: "1.1.1.2:12", isLocal: true, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -824,19 +807,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "1.1.1.2:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}, {
Endpoint: "1.1.1.1:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}, {
Endpoint: "1.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "add a slice to an endpoint",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -860,10 +833,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.2", port: 12, endpoint: "1.1.1.2:12", isLocal: true, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -891,13 +861,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "1.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "rename a port",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -916,15 +882,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "renumber a port",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -943,13 +903,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.1", port: 22, endpoint: "1.1.1.1:22", isLocal: false, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "complex add and remove",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -1015,27 +971,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "4.4.4.4", port: 44, endpoint: "4.4.4.4:44", isLocal: true, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "2.2.2.2:22",
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
}, {
Endpoint: "2.2.2.22:22",
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
}, {
Endpoint: "2.2.2.3:23",
ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP),
}, {
Endpoint: "4.4.4.5:44",
ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP),
}, {
Endpoint: "4.4.4.6:45",
ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns4", "ep4"): 1,
},
@ -1054,12 +990,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: true, serving: true, terminating: false},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "change from ready to terminating pod",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -1078,10 +1011,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: false, serving: true, terminating: true},
},
},
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "change from terminating to empty pod",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -1095,14 +1027,10 @@ func TestUpdateEndpointsMap(t *testing.T) {
{ip: "1.1.1.1", port: 11, endpoint: "1.1.1.1:11", isLocal: false, ready: false, serving: true, terminating: true},
},
},
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
expectedDeletedUDPEndpoints: []ServiceEndpoint{{
Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
expectedConntrackCleanupRequired: true,
expectedResult: map[ServicePortName][]*BaseEndpointInfo{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
},
}
@ -1147,36 +1075,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
if !result.UpdatedServices.Equal(tc.expectedChangedEndpoints) {
t.Errorf("[%d] expected changed endpoints %q, got %q", tci, tc.expectedChangedEndpoints.UnsortedList(), result.UpdatedServices.UnsortedList())
}
if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) {
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints)
if result.ConntrackCleanupRequired != tc.expectedConntrackCleanupRequired {
t.Errorf("[%d] expected conntrackCleanupRequired to be %t, got %t", tci, tc.expectedConntrackCleanupRequired, result.ConntrackCleanupRequired)
}
for _, x := range tc.expectedDeletedUDPEndpoints {
found := false
for _, stale := range result.DeletedUDPEndpoints {
if stale == x {
found = true
break
}
}
if !found {
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints)
}
}
if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) {
t.Errorf("[%d] expected %d newlyActiveUDPServices, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices)
}
for svcName := range tc.expectedNewlyActiveUDPServices {
found := false
for _, newSvcName := range result.NewlyActiveUDPServices {
if newSvcName == svcName {
found = true
}
}
if !found {
t.Errorf("[%d] expected newlyActiveUDPServices[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices)
}
}
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) {
t.Errorf("[%d] expected local ready endpoints %v, got %v", tci, tc.expectedLocalEndpoints, localReadyEndpoints)

View File

@ -1594,8 +1594,10 @@ func (proxier *Proxier) syncProxyRules() {
proxier.logger.Error(err, "Error syncing healthcheck endpoints")
}
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap)
if endpointUpdateResult.ConntrackCleanupRequired {
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap)
}
}
func (proxier *Proxier) writeServiceToEndpointRules(natRules proxyutil.LineBuffer, svcPortNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {

View File

@ -3564,22 +3564,20 @@ func TestUpdateEndpointsMap(t *testing.T) {
// previousEndpoints and currentEndpoints are used to call appropriate
// handlers OnEndpoints* (based on whether corresponding values are nil
// or non-nil) and must be of equal length.
name string
previousEndpoints []*discovery.EndpointSlice
currentEndpoints []*discovery.EndpointSlice
oldEndpoints map[proxy.ServicePortName][]endpointExpectation
expectedResult map[proxy.ServicePortName][]endpointExpectation
expectedDeletedUDPEndpoints []proxy.ServiceEndpoint
expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool
expectedLocalEndpoints map[types.NamespacedName]int
name string
previousEndpoints []*discovery.EndpointSlice
currentEndpoints []*discovery.EndpointSlice
oldEndpoints map[proxy.ServicePortName][]endpointExpectation
expectedResult map[proxy.ServicePortName][]endpointExpectation
expectedConntrackCleanupRequired bool
expectedLocalEndpoints map[types.NamespacedName]int
}{{
// Case[0]: nothing
name: "nothing",
oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{},
expectedResult: map[proxy.ServicePortName][]endpointExpectation{},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
name: "nothing",
oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{},
expectedResult: map[proxy.ServicePortName][]endpointExpectation{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[1]: no change, named port, local
name: "no change, named port, local",
@ -3595,8 +3593,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -3621,9 +3618,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.2:12", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[3]: no change, multiple subsets, multiple ports, local
name: "no change, multiple subsets, multiple ports, local",
@ -3651,8 +3647,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.3:13", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -3713,8 +3708,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.2.2.2:22", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 2,
makeNSN("ns2", "ep2"): 1,
@ -3730,10 +3724,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -3747,13 +3738,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: true},
},
},
expectedResult: map[proxy.ServicePortName][]endpointExpectation{},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedResult: map[proxy.ServicePortName][]endpointExpectation{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[7]: add an IP and port
name: "add an IP and port",
@ -3774,10 +3761,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.2:12", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -3801,18 +3785,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.2:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}, {
Endpoint: "10.1.1.1:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}, {
Endpoint: "10.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[9]: add a subset
name: "add a subset",
@ -3831,10 +3805,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.2:12", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -3856,12 +3827,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[11]: rename a port
name: "rename a port",
@ -3877,14 +3844,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[12]: renumber a port
name: "renumber a port",
@ -3900,12 +3861,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:22", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[13]: complex add and remove
name: "complex add and remove",
@ -3948,27 +3905,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.4.4.4:44", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.2.2.2:22",
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
}, {
Endpoint: "10.2.2.22:22",
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
}, {
Endpoint: "10.2.2.3:23",
ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP),
}, {
Endpoint: "10.4.4.5:44",
ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP),
}, {
Endpoint: "10.4.4.6:45",
ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns4", "ep4"): 1,
},
@ -3983,11 +3920,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
},
}
@ -4026,34 +3960,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap
checkEndpointExpectations(t, tci, newMap, tc.expectedResult)
if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) {
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints)
}
for _, x := range tc.expectedDeletedUDPEndpoints {
found := false
for _, stale := range result.DeletedUDPEndpoints {
if stale == x {
found = true
break
}
}
if !found {
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints)
}
}
if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) {
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices)
}
for svcName := range tc.expectedNewlyActiveUDPServices {
found := false
for _, stale := range result.NewlyActiveUDPServices {
if stale == svcName {
found = true
}
}
if !found {
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices)
}
if result.ConntrackCleanupRequired != tc.expectedConntrackCleanupRequired {
t.Errorf("[%d] expected conntrackCleanupRequired to be %t, got %t", tci, tc.expectedConntrackCleanupRequired, result.ConntrackCleanupRequired)
}
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) {

View File

@ -1497,8 +1497,10 @@ func (proxier *Proxier) syncProxyRules() {
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("internal").Set(float64(proxier.serviceNoLocalEndpointsInternal.Len()))
metrics.SyncProxyRulesNoLocalEndpointsTotal.WithLabelValues("external").Set(float64(proxier.serviceNoLocalEndpointsExternal.Len()))
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap)
if endpointUpdateResult.ConntrackCleanupRequired {
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap)
}
}
// writeIptablesRules write all iptables rules to proxier.natRules or proxier.FilterRules that ipvs proxier needed

View File

@ -3127,22 +3127,20 @@ func Test_updateEndpointsMap(t *testing.T) {
// previousEndpoints and currentEndpoints are used to call appropriate
// handlers OnEndpoints* (based on whether corresponding values are nil
// or non-nil) and must be of equal length.
name string
previousEndpoints []*discovery.EndpointSlice
currentEndpoints []*discovery.EndpointSlice
oldEndpoints map[proxy.ServicePortName][]endpointExpectation
expectedResult map[proxy.ServicePortName][]endpointExpectation
expectedDeletedUDPEndpoints []proxy.ServiceEndpoint
expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool
expectedReadyEndpoints map[types.NamespacedName]int
name string
previousEndpoints []*discovery.EndpointSlice
currentEndpoints []*discovery.EndpointSlice
oldEndpoints map[proxy.ServicePortName][]endpointExpectation
expectedResult map[proxy.ServicePortName][]endpointExpectation
expectedConntrackCleanupRequired bool
expectedReadyEndpoints map[types.NamespacedName]int
}{{
// Case[0]: nothing
name: "nothing",
oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{},
expectedResult: map[proxy.ServicePortName][]endpointExpectation{},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{},
name: "nothing",
oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{},
expectedResult: map[proxy.ServicePortName][]endpointExpectation{},
expectedConntrackCleanupRequired: false,
expectedReadyEndpoints: map[types.NamespacedName]int{},
}, {
// Case[1]: no change, named port, local
name: "no change, named port, local",
@ -3158,8 +3156,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedConntrackCleanupRequired: false,
expectedReadyEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -3184,9 +3181,8 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.2:12", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: false,
expectedReadyEndpoints: map[types.NamespacedName]int{},
}, {
// Case[3]: no change, multiple subsets, multiple ports, local
name: "no change, multiple subsets, multiple ports, local",
@ -3214,8 +3210,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.3:13", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedConntrackCleanupRequired: false,
expectedReadyEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -3276,8 +3271,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "2.2.2.2:22", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedConntrackCleanupRequired: false,
expectedReadyEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 2,
makeNSN("ns2", "ep2"): 1,
@ -3293,10 +3287,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedReadyEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -3310,13 +3301,9 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: true},
},
},
expectedResult: map[proxy.ServicePortName][]endpointExpectation{},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedResult: map[proxy.ServicePortName][]endpointExpectation{},
expectedReadyEndpoints: map[types.NamespacedName]int{},
}, {
// Case[7]: add an IP and port
name: "add an IP and port",
@ -3337,10 +3324,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.2:12", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedReadyEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -3364,18 +3348,8 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.2:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}, {
Endpoint: "1.1.1.1:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}, {
Endpoint: "1.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedReadyEndpoints: map[types.NamespacedName]int{},
}, {
// Case[9]: add a subset
name: "add a subset",
@ -3394,10 +3368,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.2:12", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedReadyEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -3419,12 +3390,8 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedReadyEndpoints: map[types.NamespacedName]int{},
}, {
// Case[11]: rename a port
name: "rename a port",
@ -3440,14 +3407,8 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
},
expectedReadyEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedReadyEndpoints: map[types.NamespacedName]int{},
}, {
// Case[12]: renumber a port
name: "renumber a port",
@ -3463,12 +3424,8 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:22", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "1.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedReadyEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedReadyEndpoints: map[types.NamespacedName]int{},
}, {
// Case[13]: complex add and remove
name: "complex add and remove",
@ -3511,27 +3468,7 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "4.4.4.4:44", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "2.2.2.2:22",
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
}, {
Endpoint: "2.2.2.22:22",
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
}, {
Endpoint: "2.2.2.3:23",
ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP),
}, {
Endpoint: "4.4.4.5:44",
ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP),
}, {
Endpoint: "4.4.4.6:45",
ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedReadyEndpoints: map[types.NamespacedName]int{
makeNSN("ns4", "ep4"): 1,
},
@ -3546,11 +3483,8 @@ func Test_updateEndpointsMap(t *testing.T) {
{endpoint: "1.1.1.1:11", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
},
expectedReadyEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedReadyEndpoints: map[types.NamespacedName]int{},
},
}
@ -3592,35 +3526,8 @@ func Test_updateEndpointsMap(t *testing.T) {
result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap
checkEndpointExpectations(t, tci, newMap, tc.expectedResult)
if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) {
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints)
}
for _, x := range tc.expectedDeletedUDPEndpoints {
found := false
for _, stale := range result.DeletedUDPEndpoints {
if stale == x {
found = true
break
}
}
if !found {
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints)
}
}
if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) {
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices)
}
for svcName := range tc.expectedNewlyActiveUDPServices {
found := false
for _, stale := range result.NewlyActiveUDPServices {
if stale == svcName {
found = true
break
}
}
if !found {
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices)
}
if result.ConntrackCleanupRequired != tc.expectedConntrackCleanupRequired {
t.Errorf("[%d] expected conntrackCleanupRequired to be %t, got %t", tci, tc.expectedConntrackCleanupRequired, result.ConntrackCleanupRequired)
}
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
if !reflect.DeepEqual(localReadyEndpoints, tc.expectedReadyEndpoints) {

View File

@ -1838,8 +1838,10 @@ func (proxier *Proxier) syncProxyRules() {
proxier.logger.Error(err, "Error syncing healthcheck endpoints")
}
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap)
if endpointUpdateResult.ConntrackCleanupRequired {
// Finish housekeeping, clear stale conntrack entries for UDP Services
conntrack.CleanStaleEntries(proxier.conntrack, proxier.ipFamily, proxier.svcPortMap, proxier.endpointsMap)
}
}
func (proxier *Proxier) writeServiceToEndpointRules(tx *knftables.Transaction, svcInfo *servicePortInfo, svcChain string, endpoints []proxy.Endpoint) {

View File

@ -1969,22 +1969,20 @@ func TestUpdateEndpointsMap(t *testing.T) {
// previousEndpoints and currentEndpoints are used to call appropriate
// handlers OnEndpoints* (based on whether corresponding values are nil
// or non-nil) and must be of equal length.
name string
previousEndpoints []*discovery.EndpointSlice
currentEndpoints []*discovery.EndpointSlice
oldEndpoints map[proxy.ServicePortName][]endpointExpectation
expectedResult map[proxy.ServicePortName][]endpointExpectation
expectedDeletedUDPEndpoints []proxy.ServiceEndpoint
expectedNewlyActiveUDPServices map[proxy.ServicePortName]bool
expectedLocalEndpoints map[types.NamespacedName]int
name string
previousEndpoints []*discovery.EndpointSlice
currentEndpoints []*discovery.EndpointSlice
oldEndpoints map[proxy.ServicePortName][]endpointExpectation
expectedResult map[proxy.ServicePortName][]endpointExpectation
expectedConntrackCleanupRequired bool
expectedLocalEndpoints map[types.NamespacedName]int
}{{
// Case[0]: nothing
name: "nothing",
oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{},
expectedResult: map[proxy.ServicePortName][]endpointExpectation{},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
name: "nothing",
oldEndpoints: map[proxy.ServicePortName][]endpointExpectation{},
expectedResult: map[proxy.ServicePortName][]endpointExpectation{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[1]: no change, named port, local
name: "no change, named port, local",
@ -2000,8 +1998,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -2026,9 +2023,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.2:12", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[3]: no change, multiple subsets, multiple ports, local
name: "no change, multiple subsets, multiple ports, local",
@ -2056,8 +2052,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.3:13", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -2118,8 +2113,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.2.2.2:22", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedConntrackCleanupRequired: false,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 2,
makeNSN("ns2", "ep2"): 1,
@ -2135,10 +2129,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -2152,13 +2143,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: true},
},
},
expectedResult: map[proxy.ServicePortName][]endpointExpectation{},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[7]: add an IP and port
name: "add an IP and port",
@ -2179,10 +2165,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.2:12", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -2206,18 +2189,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.2:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}, {
Endpoint: "10.1.1.1:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}, {
Endpoint: "10.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[9]: add a subset
name: "add a subset",
@ -2236,10 +2209,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.2:12", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
@ -2261,12 +2231,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.2:12",
ServicePortName: makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[11]: rename a port
name: "rename a port",
@ -2282,14 +2248,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[12]: renumber a port
name: "renumber a port",
@ -2305,12 +2265,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:22", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.1.1.1:11",
ServicePortName: makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
}, {
// Case[13]: complex add and remove
name: "complex add and remove",
@ -2353,27 +2309,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.4.4.4:44", isLocal: true},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{{
Endpoint: "10.2.2.2:22",
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
}, {
Endpoint: "10.2.2.22:22",
ServicePortName: makeServicePortName("ns2", "ep2", "p22", v1.ProtocolUDP),
}, {
Endpoint: "10.2.2.3:23",
ServicePortName: makeServicePortName("ns2", "ep2", "p23", v1.ProtocolUDP),
}, {
Endpoint: "10.4.4.5:44",
ServicePortName: makeServicePortName("ns4", "ep4", "p44", v1.ProtocolUDP),
}, {
Endpoint: "10.4.4.6:45",
ServicePortName: makeServicePortName("ns4", "ep4", "p45", v1.ProtocolUDP),
}},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p12", v1.ProtocolUDP): true,
makeServicePortName("ns1", "ep1", "p122", v1.ProtocolUDP): true,
makeServicePortName("ns3", "ep3", "p33", v1.ProtocolUDP): true,
},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns4", "ep4"): 1,
},
@ -2388,11 +2324,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
{endpoint: "10.1.1.1:11", isLocal: false},
},
},
expectedDeletedUDPEndpoints: []proxy.ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[proxy.ServicePortName]bool{
makeServicePortName("ns1", "ep1", "p11", v1.ProtocolUDP): true,
},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedConntrackCleanupRequired: true,
expectedLocalEndpoints: map[types.NamespacedName]int{},
},
}
@ -2430,34 +2363,8 @@ func TestUpdateEndpointsMap(t *testing.T) {
result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap
checkEndpointExpectations(t, tci, newMap, tc.expectedResult)
if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) {
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints)
}
for _, x := range tc.expectedDeletedUDPEndpoints {
found := false
for _, stale := range result.DeletedUDPEndpoints {
if stale == x {
found = true
break
}
}
if !found {
t.Errorf("[%d] expected staleEndpoints[%v], but didn't find it: %v", tci, x, result.DeletedUDPEndpoints)
}
}
if len(result.NewlyActiveUDPServices) != len(tc.expectedNewlyActiveUDPServices) {
t.Errorf("[%d] expected %d staleServiceNames, got %d: %v", tci, len(tc.expectedNewlyActiveUDPServices), len(result.NewlyActiveUDPServices), result.NewlyActiveUDPServices)
}
for svcName := range tc.expectedNewlyActiveUDPServices {
found := false
for _, stale := range result.NewlyActiveUDPServices {
if stale == svcName {
found = true
}
}
if !found {
t.Errorf("[%d] expected staleServiceNames[%v], but didn't find it: %v", tci, svcName, result.NewlyActiveUDPServices)
}
if result.ConntrackCleanupRequired != tc.expectedConntrackCleanupRequired {
t.Errorf("[%d] expected conntrackCleanupRequired to be %t, got %t", tci, tc.expectedConntrackCleanupRequired, result.ConntrackCleanupRequired)
}
localReadyEndpoints := fp.endpointsMap.LocalReadyEndpoints()
if !reflect.DeepEqual(localReadyEndpoints, tc.expectedLocalEndpoints) {

View File

@ -1166,17 +1166,9 @@ func (proxier *Proxier) syncProxyRules() {
// We assume that if this was called, we really want to sync them,
// even if nothing changed in the meantime. In other words, callers are
// responsible for detecting no-op changes and not calling this function.
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
_ = proxier.svcPortMap.Update(proxier.serviceChanges)
_ = proxier.endpointsMap.Update(proxier.endpointsChanges)
deletedUDPClusterIPs := serviceUpdateResult.DeletedUDPClusterIPs
// merge stale services gathered from EndpointsMap.Update
for _, svcPortName := range endpointUpdateResult.NewlyActiveUDPServices {
if svcInfo, ok := proxier.svcPortMap[svcPortName]; ok && svcInfo != nil && svcInfo.Protocol() == v1.ProtocolUDP {
klog.V(2).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
deletedUDPClusterIPs.Insert(svcInfo.ClusterIP().String())
}
}
// Query HNS for endpoints and load balancers
queriedEndpoints, err := hns.getAllEndpointsByNetwork(hnsNetworkName)
if err != nil {
@ -1715,13 +1707,6 @@ func (proxier *Proxier) syncProxyRules() {
klog.ErrorS(err, "Error syncing healthcheck endpoints")
}
// Finish housekeeping.
// TODO: these could be made more consistent.
for _, svcIP := range deletedUDPClusterIPs.UnsortedList() {
// TODO : Check if this is required to cleanup stale services here
klog.V(5).InfoS("Pending delete stale service IP connections", "IP", svcIP)
}
// remove stale endpoint refcount entries
for epIP := range proxier.terminatedEndpoints {
if epToDelete := queriedEndpoints[epIP]; epToDelete != nil && epToDelete.hnsID != "" {