Squash some unnecessarily-chained methods in the change trackers

ServicePortMap.Update() and EndpointsMap.Update() were just a tiny
wrappers around the corresponding apply() methods, which had no other
callers. So squash them together.

(Also fix the variable naming in ServicePortMap.Update() to match
other methods.)
This commit is contained in:
Dan Winship 2023-12-06 12:18:46 -05:00
parent 755b4e2bc4
commit 5d0656b1f6
2 changed files with 34 additions and 39 deletions

View File

@ -286,28 +286,20 @@ type UpdateEndpointsMapResult struct {
LastChangeTriggerTimes map[types.NamespacedName][]time.Time
}
// Update updates endpointsMap base on the given changes.
func (em EndpointsMap) Update(changes *EndpointsChangeTracker) (result UpdateEndpointsMapResult) {
result.DeletedUDPEndpoints = make([]ServiceEndpoint, 0)
result.NewlyActiveUDPServices = make([]ServicePortName, 0)
result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
em.apply(changes, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices, &result.LastChangeTriggerTimes)
return result
}
// EndpointsMap maps a service name to a list of all its Endpoints.
type EndpointsMap map[ServicePortName][]Endpoint
// apply the changes to EndpointsMap, update the passed-in stale-conntrack-entry arrays,
// and clear the changes map. In addition it returns (via argument) and resets the
// lastChangeTriggerTimes for all endpoints that were changed and will result in syncing
// the proxy rules. apply triggers processEndpointsMapChange on every change.
func (em EndpointsMap) apply(ect *EndpointsChangeTracker, deletedUDPEndpoints *[]ServiceEndpoint,
newlyActiveUDPServices *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
// Update updates em based on the changes in ect, returns information about the diff since
// the last Update, triggers processEndpointsMapChange on every change, and clears the
// changes map.
func (em EndpointsMap) Update(ect *EndpointsChangeTracker) UpdateEndpointsMapResult {
result := UpdateEndpointsMapResult{
DeletedUDPEndpoints: make([]ServiceEndpoint, 0),
NewlyActiveUDPServices: make([]ServicePortName, 0),
LastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
}
if ect == nil {
return
return result
}
changes := ect.checkoutChanges()
@ -317,9 +309,11 @@ func (em EndpointsMap) apply(ect *EndpointsChangeTracker, deletedUDPEndpoints *[
}
em.unmerge(change.previous)
em.merge(change.current)
detectStaleConntrackEntries(change.previous, change.current, deletedUDPEndpoints, newlyActiveUDPServices)
detectStaleConntrackEntries(change.previous, change.current, &result.DeletedUDPEndpoints, &result.NewlyActiveUDPServices)
}
ect.checkoutTriggerTimes(lastChangeTriggerTimes)
ect.checkoutTriggerTimes(&result.LastChangeTriggerTimes)
return result
}
// Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.

View File

@ -353,13 +353,6 @@ type UpdateServiceMapResult struct {
DeletedUDPClusterIPs sets.Set[string]
}
// Update updates ServicePortMap base on the given changes.
func (sm ServicePortMap) Update(changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
result.DeletedUDPClusterIPs = sets.New[string]()
sm.apply(changes, result.DeletedUDPClusterIPs)
return result
}
// HealthCheckNodePorts returns a map of Service names to HealthCheckNodePort values
// for all Services in sm with non-zero HealthCheckNodePort.
func (sm ServicePortMap) HealthCheckNodePorts() map[types.NamespacedName]uint16 {
@ -409,24 +402,32 @@ func (sct *ServiceChangeTracker) serviceToServiceMap(service *v1.Service) Servic
return svcPortMap
}
// apply the changes to ServicePortMap and update the deleted UDP cluster IP set.
// apply triggers processServiceMapChange on every change.
func (sm *ServicePortMap) apply(changes *ServiceChangeTracker, deletedUDPClusterIPs sets.Set[string]) {
changes.lock.Lock()
defer changes.lock.Unlock()
for _, change := range changes.items {
if changes.processServiceMapChange != nil {
changes.processServiceMapChange(change.previous, change.current)
// Update updates ServicePortMap base on the given changes, returns information about the
// diff since the last Update, triggers processServiceMapChange on every change, and
// clears the changes map.
func (sm ServicePortMap) Update(sct *ServiceChangeTracker) UpdateServiceMapResult {
sct.lock.Lock()
defer sct.lock.Unlock()
result := UpdateServiceMapResult{
DeletedUDPClusterIPs: sets.New[string](),
}
for _, change := range sct.items {
if sct.processServiceMapChange != nil {
sct.processServiceMapChange(change.previous, change.current)
}
sm.merge(change.current)
// filter out the Update event of current changes from previous changes before calling unmerge() so that can
// skip deleting the Update events.
// filter out the Update event of current changes from previous changes
// before calling unmerge() so that can skip deleting the Update events.
change.previous.filter(change.current)
sm.unmerge(change.previous, deletedUDPClusterIPs)
sm.unmerge(change.previous, result.DeletedUDPClusterIPs)
}
// clear changes after applying them to ServicePortMap.
changes.items = make(map[types.NamespacedName]*serviceChange)
sct.items = make(map[types.NamespacedName]*serviceChange)
metrics.ServiceChangesPending.Set(0)
return result
}
// merge adds other ServicePortMap's elements to current ServicePortMap.