mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
Merge pull request #83206 from robscott/endpointslice-proxy-endpointchange-perf
Reworking kube-proxy to only compute endpointChanges on apply
This commit is contained in:
commit
30ba9f6548
@ -92,7 +92,7 @@ type EndpointChangeTracker struct {
|
||||
items map[types.NamespacedName]*endpointsChange
|
||||
// makeEndpointInfo allows proxier to inject customized information when processing endpoint.
|
||||
makeEndpointInfo makeEndpointFunc
|
||||
// endpointSliceCache holds a simplified version of endpoint slices
|
||||
// endpointSliceCache holds a simplified version of endpoint slices.
|
||||
endpointSliceCache *EndpointSliceCache
|
||||
// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
|
||||
isIPv6Mode *bool
|
||||
@ -190,39 +190,54 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
|
||||
ect.lock.Lock()
|
||||
defer ect.lock.Unlock()
|
||||
|
||||
change, ok := ect.items[namespacedName]
|
||||
if !ok {
|
||||
change = &endpointsChange{}
|
||||
change.previous = ect.endpointSliceCache.EndpointsMap(namespacedName)
|
||||
ect.items[namespacedName] = change
|
||||
changeNeeded := ect.endpointSliceCache.updatePending(endpointSlice, removeSlice)
|
||||
|
||||
if changeNeeded {
|
||||
metrics.EndpointChangesPending.Inc()
|
||||
if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() {
|
||||
ect.lastChangeTriggerTimes[namespacedName] =
|
||||
append(ect.lastChangeTriggerTimes[namespacedName], t)
|
||||
}
|
||||
}
|
||||
|
||||
if removeSlice {
|
||||
ect.endpointSliceCache.Delete(endpointSlice)
|
||||
} else {
|
||||
ect.endpointSliceCache.Update(endpointSlice)
|
||||
return changeNeeded
|
||||
}
|
||||
|
||||
// checkoutChanges returns a list of pending endpointsChanges and marks them as
|
||||
// applied.
|
||||
func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange {
|
||||
ect.lock.Lock()
|
||||
defer ect.lock.Unlock()
|
||||
|
||||
metrics.EndpointChangesPending.Set(0)
|
||||
|
||||
if ect.endpointSliceCache != nil {
|
||||
return ect.endpointSliceCache.checkoutChanges()
|
||||
}
|
||||
|
||||
if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() {
|
||||
ect.lastChangeTriggerTimes[namespacedName] =
|
||||
append(ect.lastChangeTriggerTimes[namespacedName], t)
|
||||
changes := []*endpointsChange{}
|
||||
for _, change := range ect.items {
|
||||
changes = append(changes, change)
|
||||
}
|
||||
ect.items = make(map[types.NamespacedName]*endpointsChange)
|
||||
return changes
|
||||
}
|
||||
|
||||
change.current = ect.endpointSliceCache.EndpointsMap(namespacedName)
|
||||
// if change.previous equal to change.current, it means no change
|
||||
if reflect.DeepEqual(change.previous, change.current) {
|
||||
delete(ect.items, namespacedName)
|
||||
// Reset the lastChangeTriggerTimes for this service. Given that the network programming
|
||||
// SLI is defined as the duration between a time of an event and a time when the network was
|
||||
// programmed to incorporate that event, if there are events that happened between two
|
||||
// consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted,
|
||||
// there will be no network programming for them and thus no network programming latency metric
|
||||
// should be exported.
|
||||
delete(ect.lastChangeTriggerTimes, namespacedName)
|
||||
// checkoutTriggerTimes applies the locally cached trigger times to a map of
|
||||
// trigger times that have been passed in and empties the local cache.
|
||||
func (ect *EndpointChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
|
||||
ect.lock.Lock()
|
||||
defer ect.lock.Unlock()
|
||||
|
||||
for k, v := range ect.lastChangeTriggerTimes {
|
||||
prev, ok := (*lastChangeTriggerTimes)[k]
|
||||
if !ok {
|
||||
(*lastChangeTriggerTimes)[k] = v
|
||||
} else {
|
||||
(*lastChangeTriggerTimes)[k] = append(prev, v...)
|
||||
}
|
||||
}
|
||||
|
||||
metrics.EndpointChangesPending.Set(float64(len(ect.items)))
|
||||
return len(ect.items) > 0
|
||||
ect.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
|
||||
}
|
||||
|
||||
// getLastChangeTriggerTime returns the time.Time value of the
|
||||
@ -351,29 +366,19 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
|
||||
// The changes map is cleared after applying them.
|
||||
// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints
|
||||
// that were changed and will result in syncing the proxy rules.
|
||||
func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
|
||||
func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
|
||||
staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
|
||||
if changes == nil {
|
||||
if ect == nil {
|
||||
return
|
||||
}
|
||||
changes.lock.Lock()
|
||||
defer changes.lock.Unlock()
|
||||
for _, change := range changes.items {
|
||||
|
||||
changes := ect.checkoutChanges()
|
||||
for _, change := range changes {
|
||||
em.unmerge(change.previous)
|
||||
em.merge(change.current)
|
||||
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)
|
||||
}
|
||||
changes.items = make(map[types.NamespacedName]*endpointsChange)
|
||||
metrics.EndpointChangesPending.Set(0)
|
||||
for k, v := range changes.lastChangeTriggerTimes {
|
||||
prev, ok := (*lastChangeTriggerTimes)[k]
|
||||
if !ok {
|
||||
(*lastChangeTriggerTimes)[k] = v
|
||||
} else {
|
||||
(*lastChangeTriggerTimes)[k] = append(prev, v...)
|
||||
}
|
||||
}
|
||||
changes.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
|
||||
ect.checkoutTriggerTimes(lastChangeTriggerTimes)
|
||||
}
|
||||
|
||||
// Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.
|
||||
|
@ -1630,27 +1630,147 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
for _, startingSlice := range tc.startingSlices {
|
||||
tc.endpointChangeTracker.endpointSliceCache.Update(startingSlice)
|
||||
}
|
||||
t.Run(name, func(t *testing.T) {
|
||||
initializeCache(tc.endpointChangeTracker.endpointSliceCache, tc.startingSlices)
|
||||
|
||||
got := tc.endpointChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice)
|
||||
if !reflect.DeepEqual(got, tc.expectedReturnVal) {
|
||||
t.Errorf("[%s] EndpointSliceUpdate return value got: %v, want %v", name, got, tc.expectedReturnVal)
|
||||
}
|
||||
if tc.endpointChangeTracker.items == nil {
|
||||
t.Errorf("[%s] Expected ect.items to not be nil", name)
|
||||
}
|
||||
if tc.expectedCurrentChange == nil {
|
||||
if tc.endpointChangeTracker.items[tc.namespacedName] != nil {
|
||||
t.Errorf("[%s] Expected ect.items[%s] to be nil", name, tc.namespacedName)
|
||||
got := tc.endpointChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice)
|
||||
if !reflect.DeepEqual(got, tc.expectedReturnVal) {
|
||||
t.Errorf("EndpointSliceUpdate return value got: %v, want %v", got, tc.expectedReturnVal)
|
||||
}
|
||||
} else {
|
||||
if tc.endpointChangeTracker.items[tc.namespacedName] == nil {
|
||||
t.Errorf("[%s] Expected ect.items[%s] to not be nil", name, tc.namespacedName)
|
||||
if tc.endpointChangeTracker.items == nil {
|
||||
t.Errorf("Expected ect.items to not be nil")
|
||||
}
|
||||
compareEndpointsMapsStr(t, tc.endpointChangeTracker.items[tc.namespacedName].current, tc.expectedCurrentChange)
|
||||
}
|
||||
changes := tc.endpointChangeTracker.checkoutChanges()
|
||||
if tc.expectedCurrentChange == nil {
|
||||
if len(changes) != 0 {
|
||||
t.Errorf("Expected %s to have no changes", tc.namespacedName)
|
||||
}
|
||||
} else {
|
||||
if len(changes) == 0 || changes[0] == nil {
|
||||
t.Fatalf("Expected %s to have changes", tc.namespacedName)
|
||||
}
|
||||
compareEndpointsMapsStr(t, changes[0].current, tc.expectedCurrentChange)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckoutChanges(t *testing.T) {
|
||||
svcPortName0 := ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, "port-0", v1.ProtocolTCP}
|
||||
svcPortName1 := ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, "port-1", v1.ProtocolTCP}
|
||||
|
||||
testCases := map[string]struct {
|
||||
endpointChangeTracker *EndpointChangeTracker
|
||||
expectedChanges []*endpointsChange
|
||||
useEndpointSlices bool
|
||||
items map[types.NamespacedName]*endpointsChange
|
||||
appliedSlices []*discovery.EndpointSlice
|
||||
pendingSlices []*discovery.EndpointSlice
|
||||
}{
|
||||
"empty slices": {
|
||||
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true),
|
||||
expectedChanges: []*endpointsChange{},
|
||||
useEndpointSlices: true,
|
||||
appliedSlices: []*discovery.EndpointSlice{},
|
||||
pendingSlices: []*discovery.EndpointSlice{},
|
||||
},
|
||||
"without slices, empty items": {
|
||||
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false),
|
||||
expectedChanges: []*endpointsChange{},
|
||||
items: map[types.NamespacedName]*endpointsChange{},
|
||||
useEndpointSlices: false,
|
||||
},
|
||||
"without slices, simple items": {
|
||||
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false),
|
||||
expectedChanges: []*endpointsChange{{
|
||||
previous: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")},
|
||||
},
|
||||
current: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
},
|
||||
}},
|
||||
items: map[types.NamespacedName]*endpointsChange{
|
||||
{Namespace: "ns1", Name: "svc1"}: {
|
||||
previous: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")},
|
||||
},
|
||||
current: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
},
|
||||
},
|
||||
},
|
||||
useEndpointSlices: false,
|
||||
},
|
||||
"adding initial slice": {
|
||||
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true),
|
||||
expectedChanges: []*endpointsChange{{
|
||||
previous: EndpointsMap{},
|
||||
current: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
},
|
||||
}},
|
||||
useEndpointSlices: true,
|
||||
appliedSlices: []*discovery.EndpointSlice{},
|
||||
pendingSlices: []*discovery.EndpointSlice{
|
||||
generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}),
|
||||
},
|
||||
},
|
||||
"removing port in update": {
|
||||
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true),
|
||||
expectedChanges: []*endpointsChange{{
|
||||
previous: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")},
|
||||
},
|
||||
current: EndpointsMap{
|
||||
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||
},
|
||||
}},
|
||||
useEndpointSlices: true,
|
||||
appliedSlices: []*discovery.EndpointSlice{
|
||||
generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
|
||||
},
|
||||
pendingSlices: []*discovery.EndpointSlice{
|
||||
generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
if tc.useEndpointSlices {
|
||||
for _, slice := range tc.appliedSlices {
|
||||
tc.endpointChangeTracker.EndpointSliceUpdate(slice, false)
|
||||
}
|
||||
tc.endpointChangeTracker.checkoutChanges()
|
||||
for _, slice := range tc.pendingSlices {
|
||||
tc.endpointChangeTracker.EndpointSliceUpdate(slice, false)
|
||||
}
|
||||
} else {
|
||||
tc.endpointChangeTracker.items = tc.items
|
||||
}
|
||||
|
||||
changes := tc.endpointChangeTracker.checkoutChanges()
|
||||
|
||||
if len(tc.expectedChanges) != len(changes) {
|
||||
t.Fatalf("Expected %d changes, got %d", len(tc.expectedChanges), len(changes))
|
||||
}
|
||||
|
||||
for i, change := range changes {
|
||||
expectedChange := tc.expectedChanges[i]
|
||||
|
||||
if !reflect.DeepEqual(change.previous, expectedChange.previous) {
|
||||
t.Errorf("[%d] Expected change.previous: %+v, got: %+v", i, expectedChange.previous, change.previous)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(change.current, expectedChange.current) {
|
||||
t.Errorf("[%d] Expected change.current: %+v, got: %+v", i, expectedChange.current, change.current)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@ -1679,3 +1799,18 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newTestEp(ep string) *BaseEndpointInfo {
|
||||
return &BaseEndpointInfo{Endpoint: ep}
|
||||
}
|
||||
|
||||
func initializeCache(endpointSliceCache *EndpointSliceCache, endpointSlices []*discovery.EndpointSlice) {
|
||||
for _, endpointSlice := range endpointSlices {
|
||||
endpointSliceCache.updatePending(endpointSlice, false)
|
||||
}
|
||||
|
||||
for _, tracker := range endpointSliceCache.trackerByServiceMap {
|
||||
tracker.applied = tracker.pending
|
||||
tracker.pending = endpointSliceInfoByName{}
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,10 @@ package proxy
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1alpha1"
|
||||
@ -31,24 +34,41 @@ import (
|
||||
|
||||
// EndpointSliceCache is used as a cache of EndpointSlice information.
|
||||
type EndpointSliceCache struct {
|
||||
// sliceByServiceMap is the basis of this cache. It contains endpoint slice
|
||||
// info grouped by service name and endpoint slice name. The first key
|
||||
// represents a namespaced service name while the second key represents
|
||||
// lock protects trackerByServiceMap.
|
||||
lock sync.Mutex
|
||||
|
||||
// trackerByServiceMap is the basis of this cache. It contains endpoint
|
||||
// slice trackers grouped by service name and endpoint slice name. The first
|
||||
// key represents a namespaced service name while the second key represents
|
||||
// an endpoint slice name. Since endpoints can move between slices, we
|
||||
// require slice specific caching to prevent endpoints being removed from
|
||||
// the cache when they may have just moved to a different slice.
|
||||
sliceByServiceMap map[types.NamespacedName]map[string]*endpointSliceInfo
|
||||
makeEndpointInfo makeEndpointFunc
|
||||
hostname string
|
||||
isIPv6Mode *bool
|
||||
recorder record.EventRecorder
|
||||
trackerByServiceMap map[types.NamespacedName]*endpointSliceTracker
|
||||
|
||||
makeEndpointInfo makeEndpointFunc
|
||||
hostname string
|
||||
isIPv6Mode *bool
|
||||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
// endpointSliceTracker keeps track of EndpointSlices as they have been applied
|
||||
// by a proxier along with any pending EndpointSlices that have been updated
|
||||
// in this cache but not yet applied by a proxier.
|
||||
type endpointSliceTracker struct {
|
||||
applied endpointSliceInfoByName
|
||||
pending endpointSliceInfoByName
|
||||
}
|
||||
|
||||
// endpointSliceInfoByName groups endpointSliceInfo by the names of the
|
||||
// corresponding EndpointSlices.
|
||||
type endpointSliceInfoByName map[string]*endpointSliceInfo
|
||||
|
||||
// endpointSliceInfo contains just the attributes kube-proxy cares about.
|
||||
// Used for caching. Intentionally small to limit memory util.
|
||||
type endpointSliceInfo struct {
|
||||
Ports []discovery.EndpointPort
|
||||
Endpoints []*endpointInfo
|
||||
Remove bool
|
||||
}
|
||||
|
||||
// endpointInfo contains just the attributes kube-proxy cares about.
|
||||
@ -69,69 +89,122 @@ func NewEndpointSliceCache(hostname string, isIPv6Mode *bool, recorder record.Ev
|
||||
makeEndpointInfo = standardEndpointInfo
|
||||
}
|
||||
return &EndpointSliceCache{
|
||||
sliceByServiceMap: map[types.NamespacedName]map[string]*endpointSliceInfo{},
|
||||
hostname: hostname,
|
||||
isIPv6Mode: isIPv6Mode,
|
||||
makeEndpointInfo: makeEndpointInfo,
|
||||
recorder: recorder,
|
||||
trackerByServiceMap: map[types.NamespacedName]*endpointSliceTracker{},
|
||||
hostname: hostname,
|
||||
isIPv6Mode: isIPv6Mode,
|
||||
makeEndpointInfo: makeEndpointInfo,
|
||||
recorder: recorder,
|
||||
}
|
||||
}
|
||||
|
||||
// newEndpointSliceTracker initializes an endpointSliceTracker.
|
||||
func newEndpointSliceTracker() *endpointSliceTracker {
|
||||
return &endpointSliceTracker{
|
||||
applied: endpointSliceInfoByName{},
|
||||
pending: endpointSliceInfoByName{},
|
||||
}
|
||||
}
|
||||
|
||||
// newEndpointSliceInfo generates endpointSliceInfo from an EndpointSlice.
|
||||
func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceInfo {
|
||||
esInfo := &endpointSliceInfo{
|
||||
Ports: endpointSlice.Ports,
|
||||
Endpoints: []*endpointInfo{},
|
||||
Remove: remove,
|
||||
}
|
||||
|
||||
sort.Sort(byPort(esInfo.Ports))
|
||||
|
||||
if !remove {
|
||||
for _, endpoint := range endpointSlice.Endpoints {
|
||||
if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready == true {
|
||||
esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{
|
||||
Addresses: endpoint.Addresses,
|
||||
Topology: endpoint.Topology,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
sort.Sort(byAddress(esInfo.Endpoints))
|
||||
}
|
||||
|
||||
return esInfo
|
||||
}
|
||||
|
||||
// standardEndpointInfo is the default makeEndpointFunc.
|
||||
func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint {
|
||||
return ep
|
||||
}
|
||||
|
||||
// Update a slice in the cache.
|
||||
func (cache *EndpointSliceCache) Update(endpointSlice *discovery.EndpointSlice) {
|
||||
// updatePending updates a pending slice in the cache.
|
||||
func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.EndpointSlice, remove bool) bool {
|
||||
serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice)
|
||||
if err != nil {
|
||||
klog.Warningf("Error getting endpoint slice cache keys: %v", err)
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
esInfo := &endpointSliceInfo{
|
||||
Ports: endpointSlice.Ports,
|
||||
Endpoints: []*endpointInfo{},
|
||||
esInfo := newEndpointSliceInfo(endpointSlice, remove)
|
||||
|
||||
cache.lock.Lock()
|
||||
defer cache.lock.Unlock()
|
||||
|
||||
if _, ok := cache.trackerByServiceMap[serviceKey]; !ok {
|
||||
cache.trackerByServiceMap[serviceKey] = newEndpointSliceTracker()
|
||||
}
|
||||
for _, endpoint := range endpointSlice.Endpoints {
|
||||
if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready == true {
|
||||
esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{
|
||||
Addresses: endpoint.Addresses,
|
||||
Topology: endpoint.Topology,
|
||||
})
|
||||
|
||||
changed := cache.esInfoChanged(serviceKey, sliceKey, esInfo)
|
||||
|
||||
if changed {
|
||||
cache.trackerByServiceMap[serviceKey].pending[sliceKey] = esInfo
|
||||
}
|
||||
|
||||
return changed
|
||||
}
|
||||
|
||||
// checkoutChanges returns a list of all endpointsChanges that are
|
||||
// pending and then marks them as applied.
|
||||
func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange {
|
||||
changes := []*endpointsChange{}
|
||||
|
||||
cache.lock.Lock()
|
||||
defer cache.lock.Unlock()
|
||||
|
||||
for serviceNN, esTracker := range cache.trackerByServiceMap {
|
||||
if len(esTracker.pending) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
change := &endpointsChange{}
|
||||
|
||||
change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied)
|
||||
|
||||
for name, sliceInfo := range esTracker.pending {
|
||||
if sliceInfo.Remove {
|
||||
delete(esTracker.applied, name)
|
||||
} else {
|
||||
esTracker.applied[name] = sliceInfo
|
||||
}
|
||||
|
||||
delete(esTracker.pending, name)
|
||||
}
|
||||
|
||||
change.current = cache.getEndpointsMap(serviceNN, esTracker.applied)
|
||||
changes = append(changes, change)
|
||||
}
|
||||
if _, exists := cache.sliceByServiceMap[serviceKey]; !exists {
|
||||
cache.sliceByServiceMap[serviceKey] = map[string]*endpointSliceInfo{}
|
||||
}
|
||||
cache.sliceByServiceMap[serviceKey][sliceKey] = esInfo
|
||||
|
||||
return changes
|
||||
}
|
||||
|
||||
// Delete a slice from the cache.
|
||||
func (cache *EndpointSliceCache) Delete(endpointSlice *discovery.EndpointSlice) {
|
||||
serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice)
|
||||
if err != nil {
|
||||
klog.Warningf("Error getting endpoint slice cache keys: %v", err)
|
||||
return
|
||||
}
|
||||
delete(cache.sliceByServiceMap[serviceKey], sliceKey)
|
||||
}
|
||||
|
||||
// EndpointsMap computes an EndpointsMap for a given service.
|
||||
func (cache *EndpointSliceCache) EndpointsMap(serviceNN types.NamespacedName) EndpointsMap {
|
||||
endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN)
|
||||
// getEndpointsMap computes an EndpointsMap for a given set of EndpointSlices.
|
||||
func (cache *EndpointSliceCache) getEndpointsMap(serviceNN types.NamespacedName, sliceInfoByName endpointSliceInfoByName) EndpointsMap {
|
||||
endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN, sliceInfoByName)
|
||||
return endpointsMapFromEndpointInfo(endpointInfoBySP)
|
||||
}
|
||||
|
||||
// endpointInfoByServicePort groups endpoint info by service port name and address.
|
||||
func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName) spToEndpointMap {
|
||||
func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName, sliceInfoByName endpointSliceInfoByName) spToEndpointMap {
|
||||
endpointInfoBySP := spToEndpointMap{}
|
||||
sliceInfoByName, ok := cache.sliceByServiceMap[serviceNN]
|
||||
|
||||
if !ok {
|
||||
return endpointInfoBySP
|
||||
}
|
||||
|
||||
for _, sliceInfo := range sliceInfoByName {
|
||||
for _, port := range sliceInfo.Ports {
|
||||
@ -198,6 +271,36 @@ func (cache *EndpointSliceCache) isLocal(hostname string) bool {
|
||||
return len(cache.hostname) > 0 && hostname == cache.hostname
|
||||
}
|
||||
|
||||
// esInfoChanged returns true if the esInfo parameter should be set as a new
|
||||
// pending value in the cache.
|
||||
func (cache *EndpointSliceCache) esInfoChanged(serviceKey types.NamespacedName, sliceKey string, esInfo *endpointSliceInfo) bool {
|
||||
if _, ok := cache.trackerByServiceMap[serviceKey]; ok {
|
||||
appliedInfo, appliedOk := cache.trackerByServiceMap[serviceKey].applied[sliceKey]
|
||||
pendingInfo, pendingOk := cache.trackerByServiceMap[serviceKey].pending[sliceKey]
|
||||
|
||||
// If there's already a pending value, return whether or not this would
|
||||
// change that.
|
||||
if pendingOk {
|
||||
return !reflect.DeepEqual(esInfo, pendingInfo)
|
||||
}
|
||||
|
||||
// If there's already an applied value, return whether or not this would
|
||||
// change that.
|
||||
if appliedOk {
|
||||
return !reflect.DeepEqual(esInfo, appliedInfo)
|
||||
}
|
||||
}
|
||||
|
||||
// If this is marked for removal and does not exist in the cache, no changes
|
||||
// are necessary.
|
||||
if esInfo.Remove {
|
||||
return false
|
||||
}
|
||||
|
||||
// If not in the cache, and not marked for removal, it should be added.
|
||||
return true
|
||||
}
|
||||
|
||||
// endpointsMapFromEndpointInfo computes an endpointsMap from endpointInfo that
|
||||
// has been grouped by service port and IP.
|
||||
func endpointsMapFromEndpointInfo(endpointInfoBySP map[ServicePortName]map[string]Endpoint) EndpointsMap {
|
||||
@ -242,6 +345,19 @@ func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.Names
|
||||
return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err
|
||||
}
|
||||
|
||||
// byAddress helps sort endpointInfo
|
||||
type byAddress []*endpointInfo
|
||||
|
||||
func (e byAddress) Len() int {
|
||||
return len(e)
|
||||
}
|
||||
func (e byAddress) Swap(i, j int) {
|
||||
e[i], e[j] = e[j], e[i]
|
||||
}
|
||||
func (e byAddress) Less(i, j int) bool {
|
||||
return strings.Join(e[i].Addresses, ",") < strings.Join(e[j].Addresses, ",")
|
||||
}
|
||||
|
||||
// byIP helps sort endpoints by IP
|
||||
type byIP []Endpoint
|
||||
|
||||
@ -254,3 +370,16 @@ func (e byIP) Swap(i, j int) {
|
||||
func (e byIP) Less(i, j int) bool {
|
||||
return e[i].String() < e[j].String()
|
||||
}
|
||||
|
||||
// byPort helps sort EndpointSlice ports by port number
|
||||
type byPort []discovery.EndpointPort
|
||||
|
||||
func (p byPort) Len() int {
|
||||
return len(p)
|
||||
}
|
||||
func (p byPort) Swap(i, j int) {
|
||||
p[i], p[j] = p[j], p[i]
|
||||
}
|
||||
func (p byPort) Less(i, j int) bool {
|
||||
return *p[i].Port < *p[j].Port
|
||||
}
|
||||
|
@ -153,10 +153,10 @@ func TestEndpointsMapFromESC(t *testing.T) {
|
||||
esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
|
||||
|
||||
for _, endpointSlice := range tc.endpointSlices {
|
||||
esCache.Update(endpointSlice)
|
||||
esCache.updatePending(endpointSlice, false)
|
||||
}
|
||||
|
||||
compareEndpointsMapsStr(t, esCache.EndpointsMap(tc.namespacedName), tc.expectedMap)
|
||||
compareEndpointsMapsStr(t, esCache.getEndpointsMap(tc.namespacedName, esCache.trackerByServiceMap[tc.namespacedName].pending), tc.expectedMap)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -185,17 +185,153 @@ func TestEndpointInfoByServicePort(t *testing.T) {
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
|
||||
t.Run(name, func(t *testing.T) {
|
||||
esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
|
||||
|
||||
for _, endpointSlice := range tc.endpointSlices {
|
||||
esCache.Update(endpointSlice)
|
||||
}
|
||||
for _, endpointSlice := range tc.endpointSlices {
|
||||
esCache.updatePending(endpointSlice, false)
|
||||
}
|
||||
|
||||
got := esCache.endpointInfoByServicePort(tc.namespacedName)
|
||||
if !reflect.DeepEqual(got, tc.expectedMap) {
|
||||
t.Errorf("[%s] endpointInfoByServicePort does not match. Want: %+v, Got: %+v", name, tc.expectedMap, got)
|
||||
}
|
||||
got := esCache.endpointInfoByServicePort(tc.namespacedName, esCache.trackerByServiceMap[tc.namespacedName].pending)
|
||||
if !reflect.DeepEqual(got, tc.expectedMap) {
|
||||
t.Errorf("endpointInfoByServicePort does not match. Want: %+v, Got: %+v", tc.expectedMap, got)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEsInfoChanged(t *testing.T) {
|
||||
p80 := int32(80)
|
||||
p443 := int32(443)
|
||||
tcpProto := v1.ProtocolTCP
|
||||
port80 := discovery.EndpointPort{Port: &p80, Name: utilpointer.StringPtr("http"), Protocol: &tcpProto}
|
||||
port443 := discovery.EndpointPort{Port: &p443, Name: utilpointer.StringPtr("https"), Protocol: &tcpProto}
|
||||
endpoint1 := discovery.Endpoint{Addresses: []string{"10.0.1.0"}}
|
||||
endpoint2 := discovery.Endpoint{Addresses: []string{"10.0.1.1"}}
|
||||
|
||||
objMeta := metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: "bar",
|
||||
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
|
||||
}
|
||||
|
||||
testCases := map[string]struct {
|
||||
cache *EndpointSliceCache
|
||||
initialSlice *discovery.EndpointSlice
|
||||
updatedSlice *discovery.EndpointSlice
|
||||
expectChanged bool
|
||||
}{
|
||||
"identical slices, ports only": {
|
||||
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||
initialSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port80},
|
||||
},
|
||||
updatedSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port80},
|
||||
},
|
||||
expectChanged: false,
|
||||
},
|
||||
"identical slices, ports out of order": {
|
||||
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||
initialSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port443, port80},
|
||||
},
|
||||
updatedSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port80, port443},
|
||||
},
|
||||
expectChanged: false,
|
||||
},
|
||||
"port removed": {
|
||||
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||
initialSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port443, port80},
|
||||
},
|
||||
updatedSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port443},
|
||||
},
|
||||
expectChanged: true,
|
||||
},
|
||||
"port added": {
|
||||
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||
initialSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port443},
|
||||
},
|
||||
updatedSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port443, port80},
|
||||
},
|
||||
expectChanged: true,
|
||||
},
|
||||
"identical with endpoints": {
|
||||
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||
initialSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port443},
|
||||
Endpoints: []discovery.Endpoint{endpoint1, endpoint2},
|
||||
},
|
||||
updatedSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port443},
|
||||
Endpoints: []discovery.Endpoint{endpoint1, endpoint2},
|
||||
},
|
||||
expectChanged: false,
|
||||
},
|
||||
"identical with endpoints out of order": {
|
||||
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||
initialSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port443},
|
||||
Endpoints: []discovery.Endpoint{endpoint1, endpoint2},
|
||||
},
|
||||
updatedSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port443},
|
||||
Endpoints: []discovery.Endpoint{endpoint2, endpoint1},
|
||||
},
|
||||
expectChanged: false,
|
||||
},
|
||||
"identical with endpoint added": {
|
||||
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||
initialSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port443},
|
||||
Endpoints: []discovery.Endpoint{endpoint1},
|
||||
},
|
||||
updatedSlice: &discovery.EndpointSlice{
|
||||
ObjectMeta: objMeta,
|
||||
Ports: []discovery.EndpointPort{port443},
|
||||
Endpoints: []discovery.Endpoint{endpoint2, endpoint1},
|
||||
},
|
||||
expectChanged: true,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
if tc.initialSlice != nil {
|
||||
tc.cache.updatePending(tc.initialSlice, false)
|
||||
tc.cache.checkoutChanges()
|
||||
}
|
||||
|
||||
serviceKey, sliceKey, err := endpointSliceCacheKeys(tc.updatedSlice)
|
||||
if err != nil {
|
||||
t.Fatalf("Expected no error calling endpointSliceCacheKeys(): %v", err)
|
||||
}
|
||||
|
||||
esInfo := newEndpointSliceInfo(tc.updatedSlice, false)
|
||||
changed := tc.cache.esInfoChanged(serviceKey, sliceKey, esInfo)
|
||||
|
||||
if tc.expectChanged != changed {
|
||||
t.Errorf("Expected esInfoChanged() to return %t, got %t", tc.expectChanged, changed)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user