Merge pull request #124626 from danwinship/endpointslicecache

simplify EndpointSliceCache caching
This commit is contained in:
Kubernetes Prow Robot
2024-05-07 16:59:42 -07:00
committed by GitHub
2 changed files with 28 additions and 100 deletions

View File

@@ -20,7 +20,6 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"sort" "sort"
"strings"
"sync" "sync"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@@ -66,26 +65,10 @@ type endpointSliceTracker struct {
// corresponding EndpointSlices. // corresponding EndpointSlices.
type endpointSliceDataByName map[string]*endpointSliceData type endpointSliceDataByName map[string]*endpointSliceData
// endpointSliceData contains just the attributes kube-proxy cares about. // endpointSliceData contains information about a single EndpointSlice update or removal.
// Used for caching. Intentionally small to limit memory util.
type endpointSliceData struct { type endpointSliceData struct {
Ports []discovery.EndpointPort endpointSlice *discovery.EndpointSlice
Endpoints []*endpointData remove bool
Remove bool
}
// endpointData contains just the attributes kube-proxy cares about.
// Used for caching. Intentionally small to limit memory util.
// Addresses, NodeName, and Zone are copied from EndpointSlice Endpoints.
type endpointData struct {
Addresses []string
NodeName *string
Zone *string
ZoneHints sets.Set[string]
Ready bool
Serving bool
Terminating bool
} }
// NewEndpointSliceCache initializes an EndpointSliceCache. // NewEndpointSliceCache initializes an EndpointSliceCache.
@@ -110,49 +93,6 @@ func newEndpointSliceTracker() *endpointSliceTracker {
} }
} }
// newEndpointSliceData generates endpointSliceData from an EndpointSlice.
func newEndpointSliceData(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceData {
esData := &endpointSliceData{
Ports: make([]discovery.EndpointPort, len(endpointSlice.Ports)),
Endpoints: []*endpointData{},
Remove: remove,
}
// copy here to avoid mutating shared EndpointSlice object.
copy(esData.Ports, endpointSlice.Ports)
sort.Sort(byPort(esData.Ports))
if !remove {
for _, endpoint := range endpointSlice.Endpoints {
epData := &endpointData{
Addresses: endpoint.Addresses,
Zone: endpoint.Zone,
NodeName: endpoint.NodeName,
// conditions
Ready: endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready,
Serving: endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving,
Terminating: endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating,
}
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 {
epData.ZoneHints = sets.New[string]()
for _, zone := range endpoint.Hints.ForZones {
epData.ZoneHints.Insert(zone.Name)
}
}
}
esData.Endpoints = append(esData.Endpoints, epData)
}
sort.Sort(byAddress(esData.Endpoints))
}
return esData
}
// standardEndpointInfo is the default makeEndpointFunc. // standardEndpointInfo is the default makeEndpointFunc.
func standardEndpointInfo(ep *BaseEndpointInfo, _ *ServicePortName) Endpoint { func standardEndpointInfo(ep *BaseEndpointInfo, _ *ServicePortName) Endpoint {
return ep return ep
@@ -166,7 +106,7 @@ func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.Endpoint
return false return false
} }
esData := newEndpointSliceData(endpointSlice, remove) esData := &endpointSliceData{endpointSlice, remove}
cache.lock.Lock() cache.lock.Lock()
defer cache.lock.Unlock() defer cache.lock.Unlock()
@@ -202,7 +142,7 @@ func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*end
change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied) change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied)
for name, sliceData := range esTracker.pending { for name, sliceData := range esTracker.pending {
if sliceData.Remove { if sliceData.remove {
delete(esTracker.applied, name) delete(esTracker.applied, name)
} else { } else {
esTracker.applied[name] = sliceData esTracker.applied[name] = sliceData
@@ -233,7 +173,7 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names
endpointInfoBySP := spToEndpointMap{} endpointInfoBySP := spToEndpointMap{}
for _, sliceData := range sliceDataByName { for _, sliceData := range sliceDataByName {
for _, port := range sliceData.Ports { for _, port := range sliceData.endpointSlice.Ports {
if port.Name == nil { if port.Name == nil {
klog.ErrorS(nil, "Ignoring port with nil name", "portName", port.Name) klog.ErrorS(nil, "Ignoring port with nil name", "portName", port.Name)
continue continue
@@ -250,15 +190,15 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names
Protocol: *port.Protocol, Protocol: *port.Protocol,
} }
endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceData.Endpoints) endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceData.endpointSlice.Endpoints)
} }
} }
return endpointInfoBySP return endpointInfoBySP
} }
// addEndpoints adds endpointInfo for each unique endpoint. // addEndpoints adds an Endpoint for each unique endpoint.
func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointData) map[string]Endpoint { func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []discovery.Endpoint) map[string]Endpoint {
if endpointSet == nil { if endpointSet == nil {
endpointSet = map[string]Endpoint{} endpointSet = map[string]Endpoint{}
} }
@@ -281,8 +221,22 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port
isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName) isLocal := endpoint.NodeName != nil && cache.isLocal(*endpoint.NodeName)
ready := endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready
serving := endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving
terminating := endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating
var zoneHints sets.Set[string]
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 {
zoneHints = sets.New[string]()
for _, zone := range endpoint.Hints.ForZones {
zoneHints.Insert(zone.Name)
}
}
}
endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal,
endpoint.Ready, endpoint.Serving, endpoint.Terminating, endpoint.ZoneHints) ready, serving, terminating, zoneHints)
// This logic ensures we're deduplicating potential overlapping endpoints // This logic ensures we're deduplicating potential overlapping endpoints
// isLocal should not vary between matching endpoints, but if it does, we // isLocal should not vary between matching endpoints, but if it does, we
@@ -321,7 +275,7 @@ func (cache *EndpointSliceCache) esDataChanged(serviceKey types.NamespacedName,
// If this is marked for removal and does not exist in the cache, no changes // If this is marked for removal and does not exist in the cache, no changes
// are necessary. // are necessary.
if esData.Remove { if esData.remove {
return false return false
} }
@@ -373,19 +327,6 @@ func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.Names
return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err
} }
// byAddress helps sort endpointData
type byAddress []*endpointData
func (e byAddress) Len() int {
return len(e)
}
func (e byAddress) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}
func (e byAddress) Less(i, j int) bool {
return strings.Join(e[i].Addresses, ",") < strings.Join(e[j].Addresses, ",")
}
// byEndpoint helps sort endpoints by endpoint string. // byEndpoint helps sort endpoints by endpoint string.
type byEndpoint []Endpoint type byEndpoint []Endpoint
@@ -398,16 +339,3 @@ func (e byEndpoint) Swap(i, j int) {
func (e byEndpoint) Less(i, j int) bool { func (e byEndpoint) Less(i, j int) bool {
return e[i].String() < e[j].String() return e[i].String() < e[j].String()
} }
// byPort helps sort EndpointSlice ports by port number
type byPort []discovery.EndpointPort
func (p byPort) Len() int {
return len(p)
}
func (p byPort) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
func (p byPort) Less(i, j int) bool {
return *p[i].Port < *p[j].Port
}

View File

@@ -370,7 +370,7 @@ func TestEsDataChanged(t *testing.T) {
ObjectMeta: objMeta, ObjectMeta: objMeta,
Ports: []discovery.EndpointPort{port80, port443}, Ports: []discovery.EndpointPort{port80, port443},
}, },
expectChanged: false, expectChanged: true,
}, },
"port removed": { "port removed": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
@@ -422,7 +422,7 @@ func TestEsDataChanged(t *testing.T) {
Ports: []discovery.EndpointPort{port443}, Ports: []discovery.EndpointPort{port443},
Endpoints: []discovery.Endpoint{endpoint2, endpoint1}, Endpoints: []discovery.Endpoint{endpoint2, endpoint1},
}, },
expectChanged: false, expectChanged: true,
}, },
"identical with endpoint added": { "identical with endpoint added": {
cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil), cache: NewEndpointSliceCache("", v1.IPv4Protocol, nil, nil),
@@ -454,7 +454,7 @@ func TestEsDataChanged(t *testing.T) {
t.Fatalf("Expected no error calling endpointSliceCacheKeys(): %v", err) t.Fatalf("Expected no error calling endpointSliceCacheKeys(): %v", err)
} }
esData := newEndpointSliceData(tc.updatedSlice, false) esData := &endpointSliceData{tc.updatedSlice, false}
changed := tc.cache.esDataChanged(serviceKey, sliceKey, esData) changed := tc.cache.esDataChanged(serviceKey, sliceKey, esData)
if tc.expectChanged != changed { if tc.expectChanged != changed {