From 05e14799db8e4835d686d5b604ea4c9f5708120d Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Thu, 15 Jun 2023 08:17:02 -0400 Subject: [PATCH] Cache EndpointSlices directly rather than copying the data Given that we are no longer modifying any of the EndpointSlice data, we can just work with pointers to the actual EndpointSlice objects. (The informer cache is already holding on to them, so they'll be taking up memory whether or not the EndpointSliceCache points to them.) --- pkg/proxy/endpointslicecache.go | 89 ++++++++-------------------- pkg/proxy/endpointslicecache_test.go | 2 +- 2 files changed, 26 insertions(+), 65 deletions(-) diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 77bbc206931..089bf5a3710 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -65,26 +65,10 @@ type endpointSliceTracker struct { // corresponding EndpointSlices. type endpointSliceDataByName map[string]*endpointSliceData -// endpointSliceData contains just the attributes kube-proxy cares about. -// Used for caching. Intentionally small to limit memory util. +// endpointSliceData contains information about a single EndpointSlice update or removal. type endpointSliceData struct { - Ports []discovery.EndpointPort - Endpoints []*endpointData - Remove bool -} - -// endpointData contains just the attributes kube-proxy cares about. -// Used for caching. Intentionally small to limit memory util. -// Addresses, NodeName, and Zone are copied from EndpointSlice Endpoints. -type endpointData struct { - Addresses []string - NodeName *string - Zone *string - ZoneHints sets.Set[string] - - Ready bool - Serving bool - Terminating bool + endpointSlice *discovery.EndpointSlice + remove bool } // NewEndpointSliceCache initializes an EndpointSliceCache. @@ -109,43 +93,6 @@ func newEndpointSliceTracker() *endpointSliceTracker { } } -// newEndpointSliceData generates endpointSliceData from an EndpointSlice. -func newEndpointSliceData(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceData { - esData := &endpointSliceData{ - Ports: endpointSlice.Ports, - Endpoints: []*endpointData{}, - Remove: remove, - } - - if !remove { - for _, endpoint := range endpointSlice.Endpoints { - epData := &endpointData{ - Addresses: endpoint.Addresses, - Zone: endpoint.Zone, - NodeName: endpoint.NodeName, - - // conditions - Ready: endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready, - Serving: endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving, - Terminating: endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating, - } - - if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { - if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 { - epData.ZoneHints = sets.New[string]() - for _, zone := range endpoint.Hints.ForZones { - epData.ZoneHints.Insert(zone.Name) - } - } - } - - esData.Endpoints = append(esData.Endpoints, epData) - } - } - - return esData -} - // standardEndpointInfo is the default makeEndpointFunc. func standardEndpointInfo(ep *BaseEndpointInfo, _ *ServicePortName) Endpoint { return ep @@ -159,7 +106,7 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint return false } - esData := newEndpointSliceData(endpointSlice, remove) + esData := &endpointSliceData{endpointSlice, remove} cache.lock.Lock() defer cache.lock.Unlock() @@ -195,7 +142,7 @@ func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*end change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied) for name, sliceData := range esTracker.pending { - if sliceData.Remove { + if sliceData.remove { delete(esTracker.applied, name) } else { esTracker.applied[name] = sliceData @@ -226,7 +173,7 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names endpointInfoBySP := spToEndpointMap{} for _, sliceData := range sliceDataByName { - for _, port := range sliceData.Ports { + for _, port := range sliceData.endpointSlice.Ports { if port.Name == nil { klog.ErrorS(nil, "Ignoring port with nil name", "portName", port.Name) continue @@ -243,15 +190,15 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names Protocol: *port.Protocol, } - endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceData.Endpoints) + endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceData.endpointSlice.Endpoints) } } return endpointInfoBySP } -// addEndpoints adds endpointInfo for each unique endpoint. -func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointData) map[string]Endpoint { +// addEndpoints adds an Endpoint for each unique endpoint. +func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []discovery.Endpoint) map[string]Endpoint { if endpointSet == nil { endpointSet = map[string]Endpoint{} } @@ -274,8 +221,22 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName) + ready := endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready + serving := endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving + terminating := endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating + + var zoneHints sets.Set[string] + if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { + if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 { + zoneHints = sets.New[string]() + for _, zone := range endpoint.Hints.ForZones { + zoneHints.Insert(zone.Name) + } + } + } + endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, - endpoint.Ready, endpoint.Serving, endpoint.Terminating, endpoint.ZoneHints) + ready, serving, terminating, zoneHints) // This logic ensures we're deduplicating potential overlapping endpoints // isLocal should not vary between matching endpoints, but if it does, we @@ -314,7 +275,7 @@ func (cache *EndpointSliceCache) esDataChanged(serviceKey types.NamespacedName, // If this is marked for removal and does not exist in the cache, no changes // are necessary. - if esData.Remove { + if esData.remove { return false } diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index 0aa032d8274..c51203c1357 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -454,7 +454,7 @@ func TestEsDataChanged(t *testing.T) { t.Fatalf("Expected no error calling endpointSliceCacheKeys(): %v", err) } - esData := newEndpointSliceData(tc.updatedSlice, false) + esData := &endpointSliceData{tc.updatedSlice, false} changed := tc.cache.esDataChanged(serviceKey, sliceKey, esData) if tc.expectChanged != changed {