diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 77bbc206931..089bf5a3710 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -65,26 +65,10 @@ type endpointSliceTracker struct { // corresponding EndpointSlices. type endpointSliceDataByName map[string]*endpointSliceData -// endpointSliceData contains just the attributes kube-proxy cares about. -// Used for caching. Intentionally small to limit memory util. +// endpointSliceData contains information about a single EndpointSlice update or removal. type endpointSliceData struct { - Ports []discovery.EndpointPort - Endpoints []*endpointData - Remove bool -} - -// endpointData contains just the attributes kube-proxy cares about. -// Used for caching. Intentionally small to limit memory util. -// Addresses, NodeName, and Zone are copied from EndpointSlice Endpoints. -type endpointData struct { - Addresses []string - NodeName *string - Zone *string - ZoneHints sets.Set[string] - - Ready bool - Serving bool - Terminating bool + endpointSlice *discovery.EndpointSlice + remove bool } // NewEndpointSliceCache initializes an EndpointSliceCache. @@ -109,43 +93,6 @@ func newEndpointSliceTracker() *endpointSliceTracker { } } -// newEndpointSliceData generates endpointSliceData from an EndpointSlice. -func newEndpointSliceData(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceData { - esData := &endpointSliceData{ - Ports: endpointSlice.Ports, - Endpoints: []*endpointData{}, - Remove: remove, - } - - if !remove { - for _, endpoint := range endpointSlice.Endpoints { - epData := &endpointData{ - Addresses: endpoint.Addresses, - Zone: endpoint.Zone, - NodeName: endpoint.NodeName, - - // conditions - Ready: endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready, - Serving: endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving, - Terminating: endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating, - } - - if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { - if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 { - epData.ZoneHints = sets.New[string]() - for _, zone := range endpoint.Hints.ForZones { - epData.ZoneHints.Insert(zone.Name) - } - } - } - - esData.Endpoints = append(esData.Endpoints, epData) - } - } - - return esData -} - // standardEndpointInfo is the default makeEndpointFunc. func standardEndpointInfo(ep *BaseEndpointInfo, _ *ServicePortName) Endpoint { return ep @@ -159,7 +106,7 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint return false } - esData := newEndpointSliceData(endpointSlice, remove) + esData := &endpointSliceData{endpointSlice, remove} cache.lock.Lock() defer cache.lock.Unlock() @@ -195,7 +142,7 @@ func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*end change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied) for name, sliceData := range esTracker.pending { - if sliceData.Remove { + if sliceData.remove { delete(esTracker.applied, name) } else { esTracker.applied[name] = sliceData @@ -226,7 +173,7 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names endpointInfoBySP := spToEndpointMap{} for _, sliceData := range sliceDataByName { - for _, port := range sliceData.Ports { + for _, port := range sliceData.endpointSlice.Ports { if port.Name == nil { klog.ErrorS(nil, "Ignoring port with nil name", "portName", port.Name) continue @@ -243,15 +190,15 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names Protocol: *port.Protocol, } - endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceData.Endpoints) + endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceData.endpointSlice.Endpoints) } } return endpointInfoBySP } -// addEndpoints adds endpointInfo for each unique endpoint. -func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointData) map[string]Endpoint { +// addEndpoints adds an Endpoint for each unique endpoint. +func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []discovery.Endpoint) map[string]Endpoint { if endpointSet == nil { endpointSet = map[string]Endpoint{} } @@ -274,8 +221,22 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName) + ready := endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready + serving := endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving + terminating := endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating + + var zoneHints sets.Set[string] + if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { + if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 { + zoneHints = sets.New[string]() + for _, zone := range endpoint.Hints.ForZones { + zoneHints.Insert(zone.Name) + } + } + } + endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, - endpoint.Ready, endpoint.Serving, endpoint.Terminating, endpoint.ZoneHints) + ready, serving, terminating, zoneHints) // This logic ensures we're deduplicating potential overlapping endpoints // isLocal should not vary between matching endpoints, but if it does, we @@ -314,7 +275,7 @@ func (cache *EndpointSliceCache) esDataChanged(serviceKey types.NamespacedName, // If this is marked for removal and does not exist in the cache, no changes // are necessary. - if esData.Remove { + if esData.remove { return false } diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index 0aa032d8274..c51203c1357 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -454,7 +454,7 @@ func TestEsDataChanged(t *testing.T) { t.Fatalf("Expected no error calling endpointSliceCacheKeys(): %v", err) } - esData := newEndpointSliceData(tc.updatedSlice, false) + esData := &endpointSliceData{tc.updatedSlice, false} changed := tc.cache.esDataChanged(serviceKey, sliceKey, esData) if tc.expectChanged != changed {