Cache EndpointSlices directly rather than copying the data

Given that we are no longer modifying any of the EndpointSlice data,
we can just work with pointers to the actual EndpointSlice objects.
(The informer cache is already holding on to them, so they'll be
taking up memory whether or not the EndpointSliceCache points to
them.)
This commit is contained in:
Dan Winship 2023-06-15 08:17:02 -04:00
parent f956fdf240
commit 05e14799db
2 changed files with 26 additions and 65 deletions

View File

@ -65,26 +65,10 @@ type endpointSliceTracker struct {
// corresponding EndpointSlices. // corresponding EndpointSlices.
type endpointSliceDataByName map[string]*endpointSliceData type endpointSliceDataByName map[string]*endpointSliceData
// endpointSliceData contains just the attributes kube-proxy cares about. // endpointSliceData contains information about a single EndpointSlice update or removal.
// Used for caching. Intentionally small to limit memory util.
type endpointSliceData struct { type endpointSliceData struct {
Ports []discovery.EndpointPort endpointSlice *discovery.EndpointSlice
Endpoints []*endpointData remove bool
Remove bool
}
// endpointData contains just the attributes kube-proxy cares about.
// Used for caching. Intentionally small to limit memory util.
// Addresses, NodeName, and Zone are copied from EndpointSlice Endpoints.
type endpointData struct {
Addresses []string
NodeName *string
Zone *string
ZoneHints sets.Set[string]
Ready bool
Serving bool
Terminating bool
} }
// NewEndpointSliceCache initializes an EndpointSliceCache. // NewEndpointSliceCache initializes an EndpointSliceCache.
@ -109,43 +93,6 @@ func newEndpointSliceTracker() *endpointSliceTracker {
} }
} }
// newEndpointSliceData generates endpointSliceData from an EndpointSlice.
func newEndpointSliceData(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceData {
esData := &endpointSliceData{
Ports: endpointSlice.Ports,
Endpoints: []*endpointData{},
Remove: remove,
}
if !remove {
for _, endpoint := range endpointSlice.Endpoints {
epData := &endpointData{
Addresses: endpoint.Addresses,
Zone: endpoint.Zone,
NodeName: endpoint.NodeName,
// conditions
Ready: endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready,
Serving: endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving,
Terminating: endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating,
}
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 {
epData.ZoneHints = sets.New[string]()
for _, zone := range endpoint.Hints.ForZones {
epData.ZoneHints.Insert(zone.Name)
}
}
}
esData.Endpoints = append(esData.Endpoints, epData)
}
}
return esData
}
// standardEndpointInfo is the default makeEndpointFunc. // standardEndpointInfo is the default makeEndpointFunc.
func standardEndpointInfo(ep *BaseEndpointInfo, _ *ServicePortName) Endpoint { func standardEndpointInfo(ep *BaseEndpointInfo, _ *ServicePortName) Endpoint {
return ep return ep
@ -159,7 +106,7 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint
return false return false
} }
esData := newEndpointSliceData(endpointSlice, remove) esData := &endpointSliceData{endpointSlice, remove}
cache.lock.Lock() cache.lock.Lock()
defer cache.lock.Unlock() defer cache.lock.Unlock()
@ -195,7 +142,7 @@ func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*end
change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied) change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied)
for name, sliceData := range esTracker.pending { for name, sliceData := range esTracker.pending {
if sliceData.Remove { if sliceData.remove {
delete(esTracker.applied, name) delete(esTracker.applied, name)
} else { } else {
esTracker.applied[name] = sliceData esTracker.applied[name] = sliceData
@ -226,7 +173,7 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names
endpointInfoBySP := spToEndpointMap{} endpointInfoBySP := spToEndpointMap{}
for _, sliceData := range sliceDataByName { for _, sliceData := range sliceDataByName {
for _, port := range sliceData.Ports { for _, port := range sliceData.endpointSlice.Ports {
if port.Name == nil { if port.Name == nil {
klog.ErrorS(nil, "Ignoring port with nil name", "portName", port.Name) klog.ErrorS(nil, "Ignoring port with nil name", "portName", port.Name)
continue continue
@ -243,15 +190,15 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names
Protocol: *port.Protocol, Protocol: *port.Protocol,
} }
endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceData.Endpoints) endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceData.endpointSlice.Endpoints)
} }
} }
return endpointInfoBySP return endpointInfoBySP
} }
// addEndpoints adds endpointInfo for each unique endpoint. // addEndpoints adds an Endpoint for each unique endpoint.
func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointData) map[string]Endpoint { func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []discovery.Endpoint) map[string]Endpoint {
if endpointSet == nil { if endpointSet == nil {
endpointSet = map[string]Endpoint{} endpointSet = map[string]Endpoint{}
} }
@ -274,8 +221,22 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port
isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName) isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName)
ready := endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
serving := endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving
terminating := endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating
var zoneHints sets.Set[string]
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 {
zoneHints = sets.New[string]()
for _, zone := range endpoint.Hints.ForZones {
zoneHints.Insert(zone.Name)
}
}
}
endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal,
endpoint.Ready, endpoint.Serving, endpoint.Terminating, endpoint.ZoneHints) ready, serving, terminating, zoneHints)
// This logic ensures we're deduplicating potential overlapping endpoints // This logic ensures we're deduplicating potential overlapping endpoints
// isLocal should not vary between matching endpoints, but if it does, we // isLocal should not vary between matching endpoints, but if it does, we
@ -314,7 +275,7 @@ func (cache *EndpointSliceCache) esDataChanged(serviceKey types.NamespacedName,
// If this is marked for removal and does not exist in the cache, no changes // If this is marked for removal and does not exist in the cache, no changes
// are necessary. // are necessary.
if esData.Remove { if esData.remove {
return false return false
} }

View File

@ -454,7 +454,7 @@ func TestEsDataChanged(t *testing.T) {
t.Fatalf("Expected no error calling endpointSliceCacheKeys(): %v", err) t.Fatalf("Expected no error calling endpointSliceCacheKeys(): %v", err)
} }
esData := newEndpointSliceData(tc.updatedSlice, false) esData := &endpointSliceData{tc.updatedSlice, false}
changed := tc.cache.esDataChanged(serviceKey, sliceKey, esData) changed := tc.cache.esDataChanged(serviceKey, sliceKey, esData)
if tc.expectChanged != changed { if tc.expectChanged != changed {