Drop PendingChanges methods from change trackers, move into UpdateResults

This fixes a race condition where the tracker could be updated in
between us calling .PendingChanges() and .Update().
This commit is contained in:
Dan Winship 2023-12-06 12:16:10 -05:00
parent 5d0656b1f6
commit 626f349fef
6 changed files with 108 additions and 161 deletions

View File

@ -205,16 +205,9 @@ func (ect *EndpointsChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.
return changeNeeded
}
// PendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time ect was used to update an EndpointsMap. (You must call
// this _before_ calling em.Update(ect).)
func (ect *EndpointsChangeTracker) PendingChanges() sets.Set[string] {
return ect.endpointSliceCache.pendingChanges()
}
// checkoutChanges returns a list of pending endpointsChanges and marks them as
// checkoutChanges returns a map of pending endpointsChanges and marks them as
// applied.
func (ect *EndpointsChangeTracker) checkoutChanges() []*endpointsChange {
func (ect *EndpointsChangeTracker) checkoutChanges() map[types.NamespacedName]*endpointsChange {
metrics.EndpointChangesPending.Set(0)
return ect.endpointSliceCache.checkoutChanges()
@ -269,6 +262,10 @@ type endpointsChange struct {
// UpdateEndpointsMapResult is the updated results after applying endpoints changes.
type UpdateEndpointsMapResult struct {
// UpdatedServices lists the names of all services with added/updated/deleted
// endpoints since the last Update.
UpdatedServices sets.Set[types.NamespacedName]
// DeletedUDPEndpoints identifies UDP endpoints that have just been deleted.
// Existing conntrack NAT entries pointing to these endpoints must be deleted to
// ensure that no further traffic for the Service gets delivered to them.
@ -294,6 +291,7 @@ type EndpointsMap map[ServicePortName][]Endpoint
// changes map.
func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapResult {
result := UpdateEndpointsMapResult{
UpdatedServices: sets.New[types.NamespacedName](),
DeletedUDPEndpoints: make([]ServiceEndpoint, 0),
NewlyActiveUDPServices: make([]ServicePortName, 0),
LastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
@ -303,10 +301,12 @@ func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapRes
}
changes := ect.checkoutChanges()
for _, change := range changes {
for nn, change := range changes {
if ect.processEndpointsMapChange != nil {
ect.processEndpointsMapChange(change.previous, change.current)
}
result.UpdatedServices.Insert(nn)
em.unmerge(change.previous)
em.merge(change.current)
detectStaleConntrackEntries(change.previous, change.current, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices)

View File

@ -533,7 +533,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedDeletedUDPEndpoints []ServiceEndpoint
expectedNewlyActiveUDPServices map[ServicePortName]bool
expectedLocalEndpoints map[types.NamespacedName]int
expectedChangedEndpoints sets.Set[string]
expectedChangedEndpoints sets.Set[types.NamespacedName]
}{{
name: "empty",
previousEndpointsMap: map[ServicePortName][]*BaseEndpointInfo{},
@ -541,7 +541,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string](),
expectedChangedEndpoints: sets.New[types.NamespacedName](),
}, {
name: "no change, unnamed port",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -563,7 +563,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string](),
expectedChangedEndpoints: sets.New[types.NamespacedName](),
}, {
name: "no change, named port, local",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -587,7 +587,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.New[string](),
expectedChangedEndpoints: sets.New[types.NamespacedName](),
}, {
name: "no change, multiple slices",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -617,7 +617,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string](),
expectedChangedEndpoints: sets.New[types.NamespacedName](),
}, {
name: "no change, multiple slices, multiple ports, local",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -655,7 +655,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.New[string](),
expectedChangedEndpoints: sets.New[types.NamespacedName](),
}, {
name: "no change, multiple services, slices, IPs, and ports",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -726,7 +726,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
makeNSN("ns1", "ep1"): 2,
makeNSN("ns2", "ep2"): 1,
},
expectedChangedEndpoints: sets.New[string](),
expectedChangedEndpoints: sets.New[types.NamespacedName](),
}, {
name: "add an EndpointSlice",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -748,7 +748,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "remove an EndpointSlice",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -769,7 +769,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "add an IP and port",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -800,7 +800,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "remove an IP and port",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -836,7 +836,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "add a slice to an endpoint",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -867,7 +867,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns1", "ep1"): 1,
},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "remove a slice from an endpoint",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -897,7 +897,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "rename a port",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -924,7 +924,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
makeServicePortName("ns1", "ep1", "p11-2", v1.ProtocolUDP): true,
},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "renumber a port",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -949,7 +949,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "complex add and remove",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -1039,7 +1039,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedLocalEndpoints: map[types.NamespacedName]int{
makeNSN("ns4", "ep4"): 1,
},
expectedChangedEndpoints: sets.New[string]("ns1/ep1", "ns2/ep2", "ns3/ep3", "ns4/ep4"),
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1"), makeNSN("ns2", "ep2"), makeNSN("ns3", "ep3"), makeNSN("ns4", "ep4")),
}, {
name: "change from 0 endpoint address to 1 unnamed port",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -1059,7 +1059,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
makeServicePortName("ns1", "ep1", "", v1.ProtocolUDP): true,
},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "change from ready to terminating pod",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -1081,7 +1081,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
expectedDeletedUDPEndpoints: []ServiceEndpoint{},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
}, {
name: "change from terminating to empty pod",
previousEndpointSlices: []*discovery.EndpointSlice{
@ -1102,7 +1102,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
}},
expectedNewlyActiveUDPServices: map[ServicePortName]bool{},
expectedLocalEndpoints: map[types.NamespacedName]int{},
expectedChangedEndpoints: sets.New[string]("ns1/ep1"),
expectedChangedEndpoints: sets.New(makeNSN("ns1", "ep1")),
},
}
@ -1141,14 +1141,12 @@ func TestUpdateEndpointsMap(t *testing.T) {
}
}
pendingChanges := fp.endpointsChanges.PendingChanges()
if !pendingChanges.Equal(tc.expectedChangedEndpoints) {
t.Errorf("[%d] expected changed endpoints %q, got %q", tci, tc.expectedChangedEndpoints.UnsortedList(), pendingChanges.UnsortedList())
}
result := fp.endpointsMap.Update(fp.endpointsChanges)
newMap := fp.endpointsMap
compareEndpointsMapsStr(t, newMap, tc.expectedResult)
if !result.UpdatedServices.Equal(tc.expectedChangedEndpoints) {
t.Errorf("[%d] expected changed endpoints %q, got %q", tci, tc.expectedChangedEndpoints.UnsortedList(), result.UpdatedServices.UnsortedList())
}
if len(result.DeletedUDPEndpoints) != len(tc.expectedDeletedUDPEndpoints) {
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedDeletedUDPEndpoints), len(result.DeletedUDPEndpoints), result.DeletedUDPEndpoints)
}
@ -1335,14 +1333,13 @@ func TestEndpointSliceUpdate(t *testing.T) {
fqdnSlice.AddressType = discovery.AddressTypeFQDN
testCases := map[string]struct {
startingSlices []*discovery.EndpointSlice
endpointsChangeTracker *EndpointsChangeTracker
namespacedName types.NamespacedName
paramEndpointSlice *discovery.EndpointSlice
paramRemoveSlice bool
expectedReturnVal bool
expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo
expectedChangedEndpoints sets.Set[string]
startingSlices []*discovery.EndpointSlice
endpointsChangeTracker *EndpointsChangeTracker
namespacedName types.NamespacedName
paramEndpointSlice *discovery.EndpointSlice
paramRemoveSlice bool
expectedReturnVal bool
expectedCurrentChange map[ServicePortName][]*BaseEndpointInfo
}{
// test starting from an empty state
"add a simple slice that doesn't already exist": {
@ -1364,33 +1361,30 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{ip: "10.0.1.3", port: 443, endpoint: "10.0.1.3:443", isLocal: false, ready: true, serving: true, terminating: false},
},
},
expectedChangedEndpoints: sets.New[string]("ns1/svc1"),
},
// test no modification to state - current change should be nil as nothing changes
"add the same slice that already exists": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.New[string](),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
},
// ensure that only valide address types are processed
"add an FQDN slice (invalid address type)": {
startingSlices: []*discovery.EndpointSlice{
generateEndpointSlice("svc1", "ns1", 1, 3, 999, 999, []string{"host1", "host2"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.New[string](),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: fqdnSlice,
paramRemoveSlice: false,
expectedReturnVal: false,
expectedCurrentChange: nil,
},
// test additions to existing state
"add a slice that overlaps with existing state": {
@ -1423,7 +1417,6 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{ip: "10.0.2.2", port: 443, endpoint: "10.0.2.2:443", isLocal: true, ready: true, serving: true, terminating: false},
},
},
expectedChangedEndpoints: sets.New[string]("ns1/svc1"),
},
// test additions to existing state with partially overlapping slices and ports
"add a slice that overlaps with existing state and partial ports": {
@ -1454,7 +1447,6 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{ip: "10.0.2.2", port: 443, endpoint: "10.0.2.2:443", isLocal: true, ready: true, serving: true, terminating: false},
},
},
expectedChangedEndpoints: sets.New[string]("ns1/svc1"),
},
// test deletions from existing state with partially overlapping slices and ports
"remove a slice that overlaps with existing state": {
@ -1477,7 +1469,6 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{ip: "10.0.2.2", port: 443, endpoint: "10.0.2.2:443", isLocal: true, ready: true, serving: true, terminating: false},
},
},
expectedChangedEndpoints: sets.New[string]("ns1/svc1"),
},
// ensure a removal that has no effect turns into a no-op
"remove a slice that doesn't even exist in current state": {
@ -1485,13 +1476,12 @@ func TestEndpointSliceUpdate(t *testing.T) {
generateEndpointSlice("svc1", "ns1", 1, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
generateEndpointSlice("svc1", "ns1", 2, 2, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
},
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
paramRemoveSlice: true,
expectedReturnVal: false,
expectedCurrentChange: nil,
expectedChangedEndpoints: sets.New[string](),
endpointsChangeTracker: NewEndpointsChangeTracker("host1", nil, v1.IPv4Protocol, nil, nil),
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
paramEndpointSlice: generateEndpointSlice("svc1", "ns1", 3, 5, 999, 999, []string{"host1"}, []*int32{ptr.To[int32](80), ptr.To[int32](443)}),
paramRemoveSlice: true,
expectedReturnVal: false,
expectedCurrentChange: nil,
},
// start with all endpoints ready, transition to no endpoints ready
"transition all endpoints to unready state": {
@ -1515,7 +1505,6 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{ip: "10.0.1.3", port: 443, endpoint: "10.0.1.3:443", isLocal: true, ready: false, serving: false, terminating: false},
},
},
expectedChangedEndpoints: sets.New[string]("ns1/svc1"),
},
// start with no endpoints ready, transition to all endpoints ready
"transition all endpoints to ready state": {
@ -1537,7 +1526,6 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{ip: "10.0.1.2", port: 443, endpoint: "10.0.1.2:443", isLocal: true, ready: true, serving: true, terminating: false},
},
},
expectedChangedEndpoints: sets.New[string]("ns1/svc1"),
},
// start with some endpoints ready, transition to more endpoints ready
"transition some endpoints to ready state": {
@ -1566,7 +1554,6 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{ip: "10.0.2.2", port: 443, endpoint: "10.0.2.2:443", isLocal: true, ready: false, serving: false, terminating: false},
},
},
expectedChangedEndpoints: sets.New[string]("ns1/svc1"),
},
// start with some endpoints ready, transition to some terminating
"transition some endpoints to terminating state": {
@ -1595,7 +1582,6 @@ func TestEndpointSliceUpdate(t *testing.T) {
&BaseEndpointInfo{ip: "10.0.2.2", port: 443, endpoint: "10.0.2.2:443", isLocal: true, ready: false, serving: false, terminating: true},
},
},
expectedChangedEndpoints: sets.New[string]("ns1/svc1"),
},
}
@ -1608,21 +1594,16 @@ func TestEndpointSliceUpdate(t *testing.T) {
t.Errorf("EndpointSliceUpdate return value got: %v, want %v", got, tc.expectedReturnVal)
}
pendingChanges := tc.endpointsChangeTracker.PendingChanges()
if !pendingChanges.Equal(tc.expectedChangedEndpoints) {
t.Errorf("expected changed endpoints %q, got %q", tc.expectedChangedEndpoints.UnsortedList(), pendingChanges.UnsortedList())
}
changes := tc.endpointsChangeTracker.checkoutChanges()
if tc.expectedCurrentChange == nil {
if len(changes) != 0 {
t.Errorf("Expected %s to have no changes", tc.namespacedName)
}
} else {
if len(changes) == 0 || changes[0] == nil {
if _, exists := changes[tc.namespacedName]; !exists {
t.Fatalf("Expected %s to have changes", tc.namespacedName)
}
compareEndpointsMapsStr(t, changes[0].current, tc.expectedCurrentChange)
compareEndpointsMapsStr(t, changes[tc.namespacedName].current, tc.expectedCurrentChange)
}
})
}
@ -1709,15 +1690,17 @@ func TestCheckoutChanges(t *testing.T) {
t.Fatalf("Expected %d changes, got %d", len(tc.expectedChanges), len(changes))
}
for i, change := range changes {
expectedChange := tc.expectedChanges[i]
for _, change := range changes {
// All of the test cases have 0 or 1 changes, so if we're
// here, then expectedChanges[0] is what we expect.
expectedChange := tc.expectedChanges[0]
if !reflect.DeepEqual(change.previous, expectedChange.previous) {
t.Errorf("[%d] Expected change.previous: %+v, got: %+v", i, expectedChange.previous, change.previous)
t.Errorf("Expected change.previous: %+v, got: %+v", expectedChange.previous, change.previous)
}
if !reflect.DeepEqual(change.current, expectedChange.current) {
t.Errorf("[%d] Expected change.current: %+v, got: %+v", i, expectedChange.current, change.current)
t.Errorf("Expected change.current: %+v, got: %+v", expectedChange.current, change.current)
}
}
})

View File

@ -188,25 +188,10 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint
return changed
}
// pendingChanges returns a set whose keys are the names of the services whose endpoints
// have changed since the last time checkoutChanges was called
func (cache *EndpointSliceCache) pendingChanges() sets.Set[string] {
cache.lock.Lock()
defer cache.lock.Unlock()
changes := sets.New[string]()
for serviceNN, esTracker := range cache.trackerByServiceMap {
if len(esTracker.pending) > 0 {
changes.Insert(serviceNN.String())
}
}
return changes
}
// checkoutChanges returns a list of all endpointsChanges that are
// checkoutChanges returns a map of all endpointsChanges that are
// pending and then marks them as applied.
func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange {
changes := []*endpointsChange{}
func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*endpointsChange {
changes := make(map[types.NamespacedName]*endpointsChange)
cache.lock.Lock()
defer cache.lock.Unlock()
@ -231,7 +216,7 @@ func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange {
}
change.current = cache.getEndpointsMap(serviceNN, esTracker.applied)
changes = append(changes, change)
changes[serviceNN] = change
}
return changes

View File

@ -793,11 +793,6 @@ func (proxier *Proxier) syncProxyRules() {
klog.V(2).InfoS("SyncProxyRules complete", "elapsed", time.Since(start))
}()
var serviceChanged, endpointsChanged sets.Set[string]
if tryPartialSync {
serviceChanged = proxier.serviceChanges.PendingChanges()
endpointsChanged = proxier.endpointsChanges.PendingChanges()
}
serviceUpdateResult := proxier.svcPortMap.Update(proxier.serviceChanges)
endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges)
@ -1189,7 +1184,7 @@ func (proxier *Proxier) syncProxyRules() {
// then we can omit them from the restore input. However, we have to still
// figure out how many chains we _would_ have written, to make the metrics
// come out right, so we just compute them and throw them away.
if tryPartialSync && !serviceChanged.Has(svcName.NamespacedName.String()) && !endpointsChanged.Has(svcName.NamespacedName.String()) {
if tryPartialSync && !serviceUpdateResult.UpdatedServices.Has(svcName.NamespacedName) && !endpointUpdateResult.UpdatedServices.Has(svcName.NamespacedName) {
natChains = skippedNatChains
natRules = skippedNatRules
}

View File

@ -331,22 +331,12 @@ func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
return len(sct.items) > 0
}
// PendingChanges returns a set whose keys are the names of the services that have changed
// since the last time sct was used to update a ServiceMap. (You must call this _before_
// calling sm.Update(sct).)
func (sct *ServiceChangeTracker) PendingChanges() sets.Set[string] {
sct.lock.Lock()
defer sct.lock.Unlock()
changes := sets.New[string]()
for name := range sct.items {
changes.Insert(name.String())
}
return changes
}
// UpdateServiceMapResult is the updated results after applying service changes.
type UpdateServiceMapResult struct {
// UpdatedServices lists the names of all services added/updated/deleted since the
// last Update.
UpdatedServices sets.Set[types.NamespacedName]
// DeletedUDPClusterIPs holds stale (no longer assigned to a Service) Service IPs
// that had UDP ports. Callers can use this to abort timeout-waits or clear
// connection-tracking information.
@ -410,13 +400,16 @@ func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResul
defer sct.lock.Unlock()
result := UpdateServiceMapResult{
UpdatedServices: sets.New[types.NamespacedName](),
DeletedUDPClusterIPs: sets.New[string](),
}
for _, change := range sct.items {
for nn, change := range sct.items {
if sct.processServiceMapChange != nil {
sct.processServiceMapChange(change.previous, change.current)
}
result.UpdatedServices.Insert(nn)
sm.merge(change.current)
// filter out the Update event of current changes from previous changes
// before calling unmerge() so that can skip deleting the Update events.

View File

@ -650,15 +650,14 @@ func TestServiceMapUpdateHeadless(t *testing.T) {
)
// Headless service should be ignored
pending := fp.serviceChanges.PendingChanges()
if pending.Len() != 0 {
t.Errorf("expected 0 pending service changes, got %d", pending.Len())
}
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 0 {
t.Errorf("expected service map length 0, got %d", len(fp.svcPortMap))
}
if len(result.UpdatedServices) != 0 {
t.Errorf("expected 0 updated services, got %d", len(result.UpdatedServices))
}
if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
}
@ -682,14 +681,13 @@ func TestUpdateServiceTypeExternalName(t *testing.T) {
}),
)
pending := fp.serviceChanges.PendingChanges()
if pending.Len() != 0 {
t.Errorf("expected 0 pending service changes, got %d", pending.Len())
}
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 0 {
t.Errorf("expected service map length 0, got %v", fp.svcPortMap)
}
if len(result.UpdatedServices) != 0 {
t.Errorf("expected 0 updated services, got %v", result.UpdatedServices)
}
if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs)
}
@ -749,21 +747,19 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
fp.addService(services[i])
}
pending := fp.serviceChanges.PendingChanges()
for i := range services {
name := services[i].Namespace + "/" + services[i].Name
if !pending.Has(name) {
t.Errorf("expected pending change for %q", name)
}
}
if pending.Len() != len(services) {
t.Errorf("expected %d pending service changes, got %d", len(services), pending.Len())
}
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 8 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
for i := range services {
name := makeNSN(services[i].Namespace, services[i].Name)
if !result.UpdatedServices.Has(name) {
t.Errorf("expected updated service for %q", name)
}
}
if len(result.UpdatedServices) != len(services) {
t.Errorf("expected %d updated services, got %d", len(services), len(result.UpdatedServices))
}
if len(result.DeletedUDPClusterIPs) != 0 {
// Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
@ -793,14 +789,13 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
fp.deleteService(services[2])
fp.deleteService(services[3])
pending = fp.serviceChanges.PendingChanges()
if pending.Len() != 4 {
t.Errorf("expected 4 pending service changes, got %d", pending.Len())
}
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 1 {
t.Errorf("expected service map length 1, got %v", fp.svcPortMap)
}
if len(result.UpdatedServices) != 4 {
t.Errorf("expected 4 updated services, got %d", len(result.UpdatedServices))
}
healthCheckNodePorts = fp.svcPortMap.HealthCheckNodePorts()
if len(healthCheckNodePorts) != 0 {
@ -847,14 +842,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
fp.addService(servicev1)
pending := fp.serviceChanges.PendingChanges()
if pending.Len() != 1 {
t.Errorf("expected 1 pending service change, got %d", pending.Len())
}
result := fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.UpdatedServices) != 1 {
t.Errorf("expected 1 updated service, got %d", len(result.UpdatedServices))
}
if len(result.DeletedUDPClusterIPs) != 0 {
// Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))
@ -867,14 +861,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// Change service to load-balancer
fp.updateService(servicev1, servicev2)
pending = fp.serviceChanges.PendingChanges()
if pending.Len() != 1 {
t.Errorf("expected 1 pending service change, got %d", pending.Len())
}
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.UpdatedServices) != 1 {
t.Errorf("expected 1 updated service, got %d", len(result.UpdatedServices))
}
if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
}
@ -887,14 +880,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// No change; make sure the service map stays the same and there are
// no health-check changes
fp.updateService(servicev2, servicev2)
pending = fp.serviceChanges.PendingChanges()
if pending.Len() != 0 {
t.Errorf("expected 0 pending service changes, got %d", pending.Len())
}
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.UpdatedServices) != 0 {
t.Errorf("expected 0 updated services, got %d", len(result.UpdatedServices))
}
if len(result.DeletedUDPClusterIPs) != 0 {
t.Errorf("expected stale UDP services length 0, got %v", result.DeletedUDPClusterIPs.UnsortedList())
}
@ -906,14 +898,13 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
// And back to ClusterIP
fp.updateService(servicev2, servicev1)
pending = fp.serviceChanges.PendingChanges()
if pending.Len() != 1 {
t.Errorf("expected 1 pending service change, got %d", pending.Len())
}
result = fp.svcPortMap.Update(fp.serviceChanges)
if len(fp.svcPortMap) != 2 {
t.Errorf("expected service map length 2, got %v", fp.svcPortMap)
}
if len(result.UpdatedServices) != 1 {
t.Errorf("expected 1 updated service, got %d", len(result.UpdatedServices))
}
if len(result.DeletedUDPClusterIPs) != 0 {
// Services only added, so nothing stale yet
t.Errorf("expected stale UDP services length 0, got %d", len(result.DeletedUDPClusterIPs))