diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 0c21ffcc830..56348643473 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -205,16 +205,9 @@ func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery. return changeNeeded } -// PendingChanges returns a set whose keys are the names of the services whose endpoints -// have changed since the last time ect was used to update an EndpointsMap. (You must call -// this _before_ calling em.Update(ect).) -func (ect *EndpointsChangeTracker) PendingChanges() sets.Set[string] { - return ect.endpointSliceCache.pendingChanges() -} - -// checkoutChanges returns a list of pending endpointsChanges and marks them as +// checkoutChanges returns a map of pending endpointsChanges and marks them as // applied. -func (ect *EndpointsChangeTracker) checkoutChanges() []*endpointsChange { +func (ect *EndpointsChangeTracker) checkoutChanges() map[types.NamespacedName]*endpointsChange { metrics.EndpointChangesPending.Set(0) return ect.endpointSliceCache.checkoutChanges() @@ -269,6 +262,10 @@ type endpointsChange struct { // UpdateEndpointsMapResult is the updated results after applying endpoints changes. type UpdateEndpointsMapResult struct { + // UpdatedServices lists the names of all services with added/updated/deleted + // endpoints since the last Update. + UpdatedServices sets.Set[types.NamespacedName] + // DeletedUDPEndpoints identifies UDP endpoints that have just been deleted. // Existing conntrack NAT entries pointing to these endpoints must be deleted to // ensure that no further traffic for the Service gets delivered to them. @@ -294,6 +291,7 @@ type EndpointsMap map[ServicePortName][]Endpoint // changes map. func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapResult { result := UpdateEndpointsMapResult{ + UpdatedServices: sets.New[types.NamespacedName](), DeletedUDPEndpoints: make([]ServiceEndpoint, 0), NewlyActiveUDPServices: make([]ServicePortName, 0), LastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), @@ -303,10 +301,12 @@ func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapRes } changes := ect.checkoutChanges() - for _, change := range changes { + for nn, change := range changes { if ect.processEndpointsMapChange != nil { ect.processEndpointsMapChange(change.previous, change.current) } + result.UpdatedServices.Insert(nn) + em.unmerge(change.previous) em.merge(change.current) detectStaleConntrackEntries(change.previous, change.current, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices) diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 2dae2eefb27..a6cdcbbda5b 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -533,7 +533,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedDeletedUDPEndpoints []ServiceEndpoint expectedNewlyActiveUDPServices map[ServicePortName]bool expectedLocalEndpoints map[types.NamespacedName]int - expectedChangedEndpoints sets.Set[string] + expectedChangedEndpoints sets.Set[types.NamespacedName] }{{ name: "empty", previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{}, @@ -541,7 +541,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedDeletedUDPEndpoints: []ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[string](), + expectedChangedEndpoints: sets.New[types.NamespacedName](), }, { name: "no change, unnamed port", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -563,7 +563,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedDeletedUDPEndpoints: []ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[string](), + expectedChangedEndpoints: sets.New[types.NamespacedName](), }, { name: "no change, named port, local", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -587,7 +587,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, - expectedChangedEndpoints: sets.New[string](), + expectedChangedEndpoints: sets.New[types.NamespacedName](), }, { name: "no change, multiple slices", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -617,7 +617,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedDeletedUDPEndpoints: []ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[string](), + expectedChangedEndpoints: sets.New[types.NamespacedName](), }, { name: "no change, multiple slices, multiple ports, local", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -655,7 +655,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, - expectedChangedEndpoints: sets.New[string](), + expectedChangedEndpoints: sets.New[types.NamespacedName](), }, { name: "no change, multiple services, slices, IPs, and ports", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -726,7 +726,7 @@ func TestUpdateEndpointsMap(t *testing.T) { makeNSN("ns1", "ep1"): 2, makeNSN("ns2", "ep2"): 1, }, - expectedChangedEndpoints: sets.New[string](), + expectedChangedEndpoints: sets.New[types.NamespacedName](), }, { name: "add an EndpointSlice", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -748,7 +748,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, - expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "remove an EndpointSlice", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -769,7 +769,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "add an IP and port", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -800,7 +800,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, - expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "remove an IP and port", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -836,7 +836,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "add a slice to an endpoint", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -867,7 +867,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns1", "ep1"): 1, }, - expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "remove a slice from an endpoint", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -897,7 +897,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "rename a port", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -924,7 +924,7 @@ func TestUpdateEndpointsMap(t *testing.T) { makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "renumber a port", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -949,7 +949,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "complex add and remove", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -1039,7 +1039,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedLocalEndpoints: map[types.NamespacedName]int{ makeNSN("ns4", "ep4"): 1, }, - expectedChangedEndpoints: sets.New[string]("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"), + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1"), makeNSN("ns2", "ep2"), makeNSN("ns3", "ep3"), makeNSN("ns4", "ep4")), }, { name: "change from 0 endpoint address to 1 unnamed port", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -1059,7 +1059,7 @@ func TestUpdateEndpointsMap(t *testing.T) { makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true, }, expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "change from ready to terminating pod", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -1081,7 +1081,7 @@ func TestUpdateEndpointsMap(t *testing.T) { expectedDeletedUDPEndpoints: []ServiceEndpoint{}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, { name: "change from terminating to empty pod", previousEndpointSlices: []*discovery.EndpointSlice{ @@ -1102,7 +1102,7 @@ func TestUpdateEndpointsMap(t *testing.T) { }}, expectedNewlyActiveUDPServices: map[ServicePortName]bool{}, expectedLocalEndpoints: map[types.NamespacedName]int{}, - expectedChangedEndpoints: sets.New[string]("ns1/ep1"), + expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")), }, } @@ -1141,14 +1141,12 @@ func TestUpdateEndpointsMap(t *testing.T) { } } - pendingChanges := fp.endpointsChanges.PendingChanges() - if !pendingChanges.Equal(tc.expectedChangedEndpoints) { - t.Errorf("[%d] expected changed endpoints %q, got %q", tci, tc.expectedChangedEndpoints.UnsortedList(), pendingChanges.UnsortedList()) - } - result := fp.endpointsMap.Update(fp.endpointsChanges) newMap := fp.endpointsMap compareEndpointsMapsStr(t, newMap, tc.expectedResult) + if !result.UpdatedServices.Equal(tc.expectedChangedEndpoints) { + t.Errorf("[%d] expected changed endpoints %q, got %q", tci, tc.expectedChangedEndpoints.UnsortedList(), result.UpdatedServices.UnsortedList()) + } if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) { t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints) } @@ -1335,14 +1333,13 @@ func TestEndpointSliceUpdate(t *testing.T) { fqdnSlice.AddressType = discovery.AddressTypeFQDN testCases := map[string]struct { - startingSlices []*discovery.EndpointSlice - endpointsChangeTracker *EndpointsChangeTracker - namespacedName types.NamespacedName - paramEndpointSlice *discovery.EndpointSlice - paramRemoveSlice bool - expectedReturnVal bool - expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo - expectedChangedEndpoints sets.Set[string] + startingSlices []*discovery.EndpointSlice + endpointsChangeTracker *EndpointsChangeTracker + namespacedName types.NamespacedName + paramEndpointSlice *discovery.EndpointSlice + paramRemoveSlice bool + expectedReturnVal bool + expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo }{ // test starting from an empty state "add a simple slice that doesn't already exist": { @@ -1364,33 +1361,30 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{ip: "10.0.1.3", port: 443, endpoint: "10.0.1.3:443", isLocal: false, ready: true, serving: true, terminating: false}, }, }, - expectedChangedEndpoints: sets.New[string]("ns1/svc1"), }, // test no modification to state - current change should be nil as nothing changes "add the same slice that already exists": { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), - paramRemoveSlice: false, - expectedReturnVal: false, - expectedCurrentChange: nil, - expectedChangedEndpoints: sets.New[string](), + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), + paramRemoveSlice: false, + expectedReturnVal: false, + expectedCurrentChange: nil, }, // ensure that only valide address types are processed "add an FQDN slice (invalid address type)": { startingSlices: []*discovery.EndpointSlice{ generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: fqdnSlice, - paramRemoveSlice: false, - expectedReturnVal: false, - expectedCurrentChange: nil, - expectedChangedEndpoints: sets.New[string](), + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: fqdnSlice, + paramRemoveSlice: false, + expectedReturnVal: false, + expectedCurrentChange: nil, }, // test additions to existing state "add a slice that overlaps with existing state": { @@ -1423,7 +1417,6 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{ip: "10.0.2.2", port: 443, endpoint: "10.0.2.2:443", isLocal: true, ready: true, serving: true, terminating: false}, }, }, - expectedChangedEndpoints: sets.New[string]("ns1/svc1"), }, // test additions to existing state with partially overlapping slices and ports "add a slice that overlaps with existing state and partial ports": { @@ -1454,7 +1447,6 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{ip: "10.0.2.2", port: 443, endpoint: "10.0.2.2:443", isLocal: true, ready: true, serving: true, terminating: false}, }, }, - expectedChangedEndpoints: sets.New[string]("ns1/svc1"), }, // test deletions from existing state with partially overlapping slices and ports "remove a slice that overlaps with existing state": { @@ -1477,7 +1469,6 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{ip: "10.0.2.2", port: 443, endpoint: "10.0.2.2:443", isLocal: true, ready: true, serving: true, terminating: false}, }, }, - expectedChangedEndpoints: sets.New[string]("ns1/svc1"), }, // ensure a removal that has no effect turns into a no-op "remove a slice that doesn't even exist in current state": { @@ -1485,13 +1476,12 @@ func TestEndpointSliceUpdate(t *testing.T) { generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), }, - endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), - namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, - paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), - paramRemoveSlice: true, - expectedReturnVal: false, - expectedCurrentChange: nil, - expectedChangedEndpoints: sets.New[string](), + endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil), + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}), + paramRemoveSlice: true, + expectedReturnVal: false, + expectedCurrentChange: nil, }, // start with all endpoints ready, transition to no endpoints ready "transition all endpoints to unready state": { @@ -1515,7 +1505,6 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{ip: "10.0.1.3", port: 443, endpoint: "10.0.1.3:443", isLocal: true, ready: false, serving: false, terminating: false}, }, }, - expectedChangedEndpoints: sets.New[string]("ns1/svc1"), }, // start with no endpoints ready, transition to all endpoints ready "transition all endpoints to ready state": { @@ -1537,7 +1526,6 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{ip: "10.0.1.2", port: 443, endpoint: "10.0.1.2:443", isLocal: true, ready: true, serving: true, terminating: false}, }, }, - expectedChangedEndpoints: sets.New[string]("ns1/svc1"), }, // start with some endpoints ready, transition to more endpoints ready "transition some endpoints to ready state": { @@ -1566,7 +1554,6 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{ip: "10.0.2.2", port: 443, endpoint: "10.0.2.2:443", isLocal: true, ready: false, serving: false, terminating: false}, }, }, - expectedChangedEndpoints: sets.New[string]("ns1/svc1"), }, // start with some endpoints ready, transition to some terminating "transition some endpoints to terminating state": { @@ -1595,7 +1582,6 @@ func TestEndpointSliceUpdate(t *testing.T) { &BaseEndpointInfo{ip: "10.0.2.2", port: 443, endpoint: "10.0.2.2:443", isLocal: true, ready: false, serving: false, terminating: true}, }, }, - expectedChangedEndpoints: sets.New[string]("ns1/svc1"), }, } @@ -1608,21 +1594,16 @@ func TestEndpointSliceUpdate(t *testing.T) { t.Errorf("EndpointSliceUpdate return value got: %v, want %v", got, tc.expectedReturnVal) } - pendingChanges := tc.endpointsChangeTracker.PendingChanges() - if !pendingChanges.Equal(tc.expectedChangedEndpoints) { - t.Errorf("expected changed endpoints %q, got %q", tc.expectedChangedEndpoints.UnsortedList(), pendingChanges.UnsortedList()) - } - changes := tc.endpointsChangeTracker.checkoutChanges() if tc.expectedCurrentChange == nil { if len(changes) != 0 { t.Errorf("Expected %s to have no changes", tc.namespacedName) } } else { - if len(changes) == 0 || changes[0] == nil { + if _, exists := changes[tc.namespacedName]; !exists { t.Fatalf("Expected %s to have changes", tc.namespacedName) } - compareEndpointsMapsStr(t, changes[0].current, tc.expectedCurrentChange) + compareEndpointsMapsStr(t, changes[tc.namespacedName].current, tc.expectedCurrentChange) } }) } @@ -1709,15 +1690,17 @@ func TestCheckoutChanges(t *testing.T) { t.Fatalf("Expected %d changes, got %d", len(tc.expectedChanges), len(changes)) } - for i, change := range changes { - expectedChange := tc.expectedChanges[i] + for _, change := range changes { + // All of the test cases have 0 or 1 changes, so if we're + // here, then expectedChanges[0] is what we expect. + expectedChange := tc.expectedChanges[0] if !reflect.DeepEqual(change.previous, expectedChange.previous) { - t.Errorf("[%d] Expected change.previous: %+v, got: %+v", i, expectedChange.previous, change.previous) + t.Errorf("Expected change.previous: %+v, got: %+v", expectedChange.previous, change.previous) } if !reflect.DeepEqual(change.current, expectedChange.current) { - t.Errorf("[%d] Expected change.current: %+v, got: %+v", i, expectedChange.current, change.current) + t.Errorf("Expected change.current: %+v, got: %+v", expectedChange.current, change.current) } } }) diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 87e4305e60c..125d625cad2 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -188,25 +188,10 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint return changed } -// pendingChanges returns a set whose keys are the names of the services whose endpoints -// have changed since the last time checkoutChanges was called -func (cache *EndpointSliceCache) pendingChanges() sets.Set[string] { - cache.lock.Lock() - defer cache.lock.Unlock() - - changes := sets.New[string]() - for serviceNN, esTracker := range cache.trackerByServiceMap { - if len(esTracker.pending) > 0 { - changes.Insert(serviceNN.String()) - } - } - return changes -} - -// checkoutChanges returns a list of all endpointsChanges that are +// checkoutChanges returns a map of all endpointsChanges that are // pending and then marks them as applied. -func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange { - changes := []*endpointsChange{} +func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*endpointsChange { + changes := make(map[types.NamespacedName]*endpointsChange) cache.lock.Lock() defer cache.lock.Unlock() @@ -231,7 +216,7 @@ func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange { } change.current = cache.getEndpointsMap(serviceNN, esTracker.applied) - changes = append(changes, change) + changes[serviceNN] = change } return changes diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 1109e096324..2736226e65b 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -793,11 +793,6 @@ func (proxier *Proxier) syncProxyRules() { klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start)) }() - var serviceChanged, endpointsChanged sets.Set[string] - if tryPartialSync { - serviceChanged = proxier.serviceChanges.PendingChanges() - endpointsChanged = proxier.endpointsChanges.PendingChanges() - } serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) @@ -1189,7 +1184,7 @@ func (proxier *Proxier) syncProxyRules() { // then we can omit them from the restore input. However, we have to still // figure out how many chains we _would_ have written, to make the metrics // come out right, so we just compute them and throw them away. - if tryPartialSync && !serviceChanged.Has(svcName.NamespacedName.String()) && !endpointsChanged.Has(svcName.NamespacedName.String()) { + if tryPartialSync && !serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) && !endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName) { natChains = skippedNatChains natRules = skippedNatRules } diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index 6eeefeca245..5c2cbff472c 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -331,22 +331,12 @@ func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool { return len(sct.items) > 0 } -// PendingChanges returns a set whose keys are the names of the services that have changed -// since the last time sct was used to update a ServiceMap. (You must call this _before_ -// calling sm.Update(sct).) -func (sct *ServiceChangeTracker) PendingChanges() sets.Set[string] { - sct.lock.Lock() - defer sct.lock.Unlock() - - changes := sets.New[string]() - for name := range sct.items { - changes.Insert(name.String()) - } - return changes -} - // UpdateServiceMapResult is the updated results after applying service changes. type UpdateServiceMapResult struct { + // UpdatedServices lists the names of all services added/updated/deleted since the + // last Update. + UpdatedServices sets.Set[types.NamespacedName] + // DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs // that had UDP ports. Callers can use this to abort timeout-waits or clear // connection-tracking information. @@ -410,13 +400,16 @@ func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResul defer sct.lock.Unlock() result := UpdateServiceMapResult{ + UpdatedServices: sets.New[types.NamespacedName](), DeletedUDPClusterIPs: sets.New[string](), } - for _, change := range sct.items { + for nn, change := range sct.items { if sct.processServiceMapChange != nil { sct.processServiceMapChange(change.previous, change.current) } + result.UpdatedServices.Insert(nn) + sm.merge(change.current) // filter out the Update event of current changes from previous changes // before calling unmerge() so that can skip deleting the Update events. diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index f21b268dff3..864526aaea3 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -650,15 +650,14 @@ func TestServiceMapUpdateHeadless(t *testing.T) { ) // Headless service should be ignored - pending := fp.serviceChanges.PendingChanges() - if pending.Len() != 0 { - t.Errorf("expected 0 pending service changes, got %d", pending.Len()) - } result := fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap)) } + if len(result.UpdatedServices) != 0 { + t.Errorf("expected 0 updated services, got %d", len(result.UpdatedServices)) + } if len(result.DeletedUDPClusterIPs) != 0 { t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) } @@ -682,14 +681,13 @@ func TestUpdateServiceTypeExternalName(t *testing.T) { }), ) - pending := fp.serviceChanges.PendingChanges() - if pending.Len() != 0 { - t.Errorf("expected 0 pending service changes, got %d", pending.Len()) - } result := fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 0 { t.Errorf("expected service map length 0, got %v", fp.svcPortMap) } + if len(result.UpdatedServices) != 0 { + t.Errorf("expected 0 updated services, got %v", result.UpdatedServices) + } if len(result.DeletedUDPClusterIPs) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs) } @@ -749,21 +747,19 @@ func TestBuildServiceMapAddRemove(t *testing.T) { fp.addService(services[i]) } - pending := fp.serviceChanges.PendingChanges() - for i := range services { - name := services[i].Namespace + "/" + services[i].Name - if !pending.Has(name) { - t.Errorf("expected pending change for %q", name) - } - } - if pending.Len() != len(services) { - t.Errorf("expected %d pending service changes, got %d", len(services), pending.Len()) - } - result := fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 8 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } + for i := range services { + name := makeNSN(services[i].Namespace, services[i].Name) + if !result.UpdatedServices.Has(name) { + t.Errorf("expected updated service for %q", name) + } + } + if len(result.UpdatedServices) != len(services) { + t.Errorf("expected %d updated services, got %d", len(services), len(result.UpdatedServices)) + } if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) @@ -793,14 +789,13 @@ func TestBuildServiceMapAddRemove(t *testing.T) { fp.deleteService(services[2]) fp.deleteService(services[3]) - pending = fp.serviceChanges.PendingChanges() - if pending.Len() != 4 { - t.Errorf("expected 4 pending service changes, got %d", pending.Len()) - } result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 1 { t.Errorf("expected service map length 1, got %v", fp.svcPortMap) } + if len(result.UpdatedServices) != 4 { + t.Errorf("expected 4 updated services, got %d", len(result.UpdatedServices)) + } healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts() if len(healthCheckNodePorts) != 0 { @@ -847,14 +842,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { fp.addService(servicev1) - pending := fp.serviceChanges.PendingChanges() - if pending.Len() != 1 { - t.Errorf("expected 1 pending service change, got %d", pending.Len()) - } result := fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } + if len(result.UpdatedServices) != 1 { + t.Errorf("expected 1 updated service, got %d", len(result.UpdatedServices)) + } if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs)) @@ -867,14 +861,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // Change service to load-balancer fp.updateService(servicev1, servicev2) - pending = fp.serviceChanges.PendingChanges() - if pending.Len() != 1 { - t.Errorf("expected 1 pending service change, got %d", pending.Len()) - } result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } + if len(result.UpdatedServices) != 1 { + t.Errorf("expected 1 updated service, got %d", len(result.UpdatedServices)) + } if len(result.DeletedUDPClusterIPs) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList()) } @@ -887,14 +880,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // No change; make sure the service map stays the same and there are // no health-check changes fp.updateService(servicev2, servicev2) - pending = fp.serviceChanges.PendingChanges() - if pending.Len() != 0 { - t.Errorf("expected 0 pending service changes, got %d", pending.Len()) - } result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } + if len(result.UpdatedServices) != 0 { + t.Errorf("expected 0 updated services, got %d", len(result.UpdatedServices)) + } if len(result.DeletedUDPClusterIPs) != 0 { t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList()) } @@ -906,14 +898,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // And back to ClusterIP fp.updateService(servicev2, servicev1) - pending = fp.serviceChanges.PendingChanges() - if pending.Len() != 1 { - t.Errorf("expected 1 pending service change, got %d", pending.Len()) - } result = fp.svcPortMap.Update(fp.serviceChanges) if len(fp.svcPortMap) != 2 { t.Errorf("expected service map length 2, got %v", fp.svcPortMap) } + if len(result.UpdatedServices) != 1 { + t.Errorf("expected 1 updated service, got %d", len(result.UpdatedServices)) + } if len(result.DeletedUDPClusterIPs) != 0 { // Services only added, so nothing stale yet t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))