mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-04 09:49:50 +00:00
Reworking kube-proxy to only compute endpointChanges on apply.
Computing EndpointChanges is a relatively expensive operation for kube-proxy when Endpoint Slices are used. This had been computed on every EndpointSlice update which became quite inefficient at high levels of scale when multiple EndpointSlice update events would be triggered before a syncProxyRules call. Profiling results showed that computing this on each update could consume ~80% of total kube-proxy CPU utilization at high levels of scale. This change reduced that to as little as 3% of total kube-proxy utilization at high levels of scale. It's worth noting that the difference is minimal when there is a 1:1 relationship between EndpointSlice updates and proxier syncs. This is primarily beneficial when there are many EndpointSlice updates between proxier sync loops.
This commit is contained in:
parent
f1bb6089ce
commit
8e7de45034
@ -92,7 +92,7 @@ type EndpointChangeTracker struct {
|
|||||||
items map[types.NamespacedName]*endpointsChange
|
items map[types.NamespacedName]*endpointsChange
|
||||||
// makeEndpointInfo allows proxier to inject customized information when processing endpoint.
|
// makeEndpointInfo allows proxier to inject customized information when processing endpoint.
|
||||||
makeEndpointInfo makeEndpointFunc
|
makeEndpointInfo makeEndpointFunc
|
||||||
// endpointSliceCache holds a simplified version of endpoint slices
|
// endpointSliceCache holds a simplified version of endpoint slices.
|
||||||
endpointSliceCache *EndpointSliceCache
|
endpointSliceCache *EndpointSliceCache
|
||||||
// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
|
// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
|
||||||
isIPv6Mode *bool
|
isIPv6Mode *bool
|
||||||
@ -190,39 +190,54 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
|
|||||||
ect.lock.Lock()
|
ect.lock.Lock()
|
||||||
defer ect.lock.Unlock()
|
defer ect.lock.Unlock()
|
||||||
|
|
||||||
change, ok := ect.items[namespacedName]
|
changeNeeded := ect.endpointSliceCache.updatePending(endpointSlice, removeSlice)
|
||||||
if !ok {
|
|
||||||
change = &endpointsChange{}
|
if changeNeeded {
|
||||||
change.previous = ect.endpointSliceCache.EndpointsMap(namespacedName)
|
metrics.EndpointChangesPending.Inc()
|
||||||
ect.items[namespacedName] = change
|
if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() {
|
||||||
|
ect.lastChangeTriggerTimes[namespacedName] =
|
||||||
|
append(ect.lastChangeTriggerTimes[namespacedName], t)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if removeSlice {
|
return changeNeeded
|
||||||
ect.endpointSliceCache.Delete(endpointSlice)
|
}
|
||||||
} else {
|
|
||||||
ect.endpointSliceCache.Update(endpointSlice)
|
// checkoutChanges returns a list of pending endpointsChanges and marks them as
|
||||||
|
// applied.
|
||||||
|
func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange {
|
||||||
|
ect.lock.Lock()
|
||||||
|
defer ect.lock.Unlock()
|
||||||
|
|
||||||
|
metrics.EndpointChangesPending.Set(0)
|
||||||
|
|
||||||
|
if ect.endpointSliceCache != nil {
|
||||||
|
return ect.endpointSliceCache.checkoutChanges()
|
||||||
}
|
}
|
||||||
|
|
||||||
if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() {
|
changes := []*endpointsChange{}
|
||||||
ect.lastChangeTriggerTimes[namespacedName] =
|
for _, change := range ect.items {
|
||||||
append(ect.lastChangeTriggerTimes[namespacedName], t)
|
changes = append(changes, change)
|
||||||
}
|
}
|
||||||
|
ect.items = make(map[types.NamespacedName]*endpointsChange)
|
||||||
|
return changes
|
||||||
|
}
|
||||||
|
|
||||||
change.current = ect.endpointSliceCache.EndpointsMap(namespacedName)
|
// checkoutTriggerTimes applies the locally cached trigger times to a map of
|
||||||
// if change.previous equal to change.current, it means no change
|
// trigger times that have been passed in and empties the local cache.
|
||||||
if reflect.DeepEqual(change.previous, change.current) {
|
func (ect *EndpointChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
|
||||||
delete(ect.items, namespacedName)
|
ect.lock.Lock()
|
||||||
// Reset the lastChangeTriggerTimes for this service. Given that the network programming
|
defer ect.lock.Unlock()
|
||||||
// SLI is defined as the duration between a time of an event and a time when the network was
|
|
||||||
// programmed to incorporate that event, if there are events that happened between two
|
for k, v := range ect.lastChangeTriggerTimes {
|
||||||
// consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted,
|
prev, ok := (*lastChangeTriggerTimes)[k]
|
||||||
// there will be no network programming for them and thus no network programming latency metric
|
if !ok {
|
||||||
// should be exported.
|
(*lastChangeTriggerTimes)[k] = v
|
||||||
delete(ect.lastChangeTriggerTimes, namespacedName)
|
} else {
|
||||||
|
(*lastChangeTriggerTimes)[k] = append(prev, v...)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
ect.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
|
||||||
metrics.EndpointChangesPending.Set(float64(len(ect.items)))
|
|
||||||
return len(ect.items) > 0
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// getLastChangeTriggerTime returns the time.Time value of the
|
// getLastChangeTriggerTime returns the time.Time value of the
|
||||||
@ -351,29 +366,19 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
|
|||||||
// The changes map is cleared after applying them.
|
// The changes map is cleared after applying them.
|
||||||
// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints
|
// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints
|
||||||
// that were changed and will result in syncing the proxy rules.
|
// that were changed and will result in syncing the proxy rules.
|
||||||
func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
|
func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
|
||||||
staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
|
staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) {
|
||||||
if changes == nil {
|
if ect == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
changes.lock.Lock()
|
|
||||||
defer changes.lock.Unlock()
|
changes := ect.checkoutChanges()
|
||||||
for _, change := range changes.items {
|
for _, change := range changes {
|
||||||
em.unmerge(change.previous)
|
em.unmerge(change.previous)
|
||||||
em.merge(change.current)
|
em.merge(change.current)
|
||||||
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)
|
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)
|
||||||
}
|
}
|
||||||
changes.items = make(map[types.NamespacedName]*endpointsChange)
|
ect.checkoutTriggerTimes(lastChangeTriggerTimes)
|
||||||
metrics.EndpointChangesPending.Set(0)
|
|
||||||
for k, v := range changes.lastChangeTriggerTimes {
|
|
||||||
prev, ok := (*lastChangeTriggerTimes)[k]
|
|
||||||
if !ok {
|
|
||||||
(*lastChangeTriggerTimes)[k] = v
|
|
||||||
} else {
|
|
||||||
(*lastChangeTriggerTimes)[k] = append(prev, v...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
changes.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.
|
// Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.
|
||||||
|
@ -1630,27 +1630,147 @@ func TestEndpointSliceUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for name, tc := range testCases {
|
for name, tc := range testCases {
|
||||||
for _, startingSlice := range tc.startingSlices {
|
t.Run(name, func(t *testing.T) {
|
||||||
tc.endpointChangeTracker.endpointSliceCache.Update(startingSlice)
|
initializeCache(tc.endpointChangeTracker.endpointSliceCache, tc.startingSlices)
|
||||||
}
|
|
||||||
|
|
||||||
got := tc.endpointChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice)
|
got := tc.endpointChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice)
|
||||||
if !reflect.DeepEqual(got, tc.expectedReturnVal) {
|
if !reflect.DeepEqual(got, tc.expectedReturnVal) {
|
||||||
t.Errorf("[%s] EndpointSliceUpdate return value got: %v, want %v", name, got, tc.expectedReturnVal)
|
t.Errorf("EndpointSliceUpdate return value got: %v, want %v", got, tc.expectedReturnVal)
|
||||||
}
|
|
||||||
if tc.endpointChangeTracker.items == nil {
|
|
||||||
t.Errorf("[%s] Expected ect.items to not be nil", name)
|
|
||||||
}
|
|
||||||
if tc.expectedCurrentChange == nil {
|
|
||||||
if tc.endpointChangeTracker.items[tc.namespacedName] != nil {
|
|
||||||
t.Errorf("[%s] Expected ect.items[%s] to be nil", name, tc.namespacedName)
|
|
||||||
}
|
}
|
||||||
} else {
|
if tc.endpointChangeTracker.items == nil {
|
||||||
if tc.endpointChangeTracker.items[tc.namespacedName] == nil {
|
t.Errorf("Expected ect.items to not be nil")
|
||||||
t.Errorf("[%s] Expected ect.items[%s] to not be nil", name, tc.namespacedName)
|
|
||||||
}
|
}
|
||||||
compareEndpointsMapsStr(t, tc.endpointChangeTracker.items[tc.namespacedName].current, tc.expectedCurrentChange)
|
changes := tc.endpointChangeTracker.checkoutChanges()
|
||||||
}
|
if tc.expectedCurrentChange == nil {
|
||||||
|
if len(changes) != 0 {
|
||||||
|
t.Errorf("Expected %s to have no changes", tc.namespacedName)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if len(changes) == 0 || changes[0] == nil {
|
||||||
|
t.Fatalf("Expected %s to have changes", tc.namespacedName)
|
||||||
|
}
|
||||||
|
compareEndpointsMapsStr(t, changes[0].current, tc.expectedCurrentChange)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckoutChanges(t *testing.T) {
|
||||||
|
svcPortName0 := ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, "port-0", v1.ProtocolTCP}
|
||||||
|
svcPortName1 := ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, "port-1", v1.ProtocolTCP}
|
||||||
|
|
||||||
|
testCases := map[string]struct {
|
||||||
|
endpointChangeTracker *EndpointChangeTracker
|
||||||
|
expectedChanges []*endpointsChange
|
||||||
|
useEndpointSlices bool
|
||||||
|
items map[types.NamespacedName]*endpointsChange
|
||||||
|
appliedSlices []*discovery.EndpointSlice
|
||||||
|
pendingSlices []*discovery.EndpointSlice
|
||||||
|
}{
|
||||||
|
"empty slices": {
|
||||||
|
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true),
|
||||||
|
expectedChanges: []*endpointsChange{},
|
||||||
|
useEndpointSlices: true,
|
||||||
|
appliedSlices: []*discovery.EndpointSlice{},
|
||||||
|
pendingSlices: []*discovery.EndpointSlice{},
|
||||||
|
},
|
||||||
|
"without slices, empty items": {
|
||||||
|
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false),
|
||||||
|
expectedChanges: []*endpointsChange{},
|
||||||
|
items: map[types.NamespacedName]*endpointsChange{},
|
||||||
|
useEndpointSlices: false,
|
||||||
|
},
|
||||||
|
"without slices, simple items": {
|
||||||
|
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false),
|
||||||
|
expectedChanges: []*endpointsChange{{
|
||||||
|
previous: EndpointsMap{
|
||||||
|
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||||
|
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")},
|
||||||
|
},
|
||||||
|
current: EndpointsMap{
|
||||||
|
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
items: map[types.NamespacedName]*endpointsChange{
|
||||||
|
{Namespace: "ns1", Name: "svc1"}: {
|
||||||
|
previous: EndpointsMap{
|
||||||
|
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||||
|
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")},
|
||||||
|
},
|
||||||
|
current: EndpointsMap{
|
||||||
|
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
useEndpointSlices: false,
|
||||||
|
},
|
||||||
|
"adding initial slice": {
|
||||||
|
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true),
|
||||||
|
expectedChanges: []*endpointsChange{{
|
||||||
|
previous: EndpointsMap{},
|
||||||
|
current: EndpointsMap{
|
||||||
|
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
useEndpointSlices: true,
|
||||||
|
appliedSlices: []*discovery.EndpointSlice{},
|
||||||
|
pendingSlices: []*discovery.EndpointSlice{
|
||||||
|
generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"removing port in update": {
|
||||||
|
endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true),
|
||||||
|
expectedChanges: []*endpointsChange{{
|
||||||
|
previous: EndpointsMap{
|
||||||
|
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||||
|
svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")},
|
||||||
|
},
|
||||||
|
current: EndpointsMap{
|
||||||
|
svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")},
|
||||||
|
},
|
||||||
|
}},
|
||||||
|
useEndpointSlices: true,
|
||||||
|
appliedSlices: []*discovery.EndpointSlice{
|
||||||
|
generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}),
|
||||||
|
},
|
||||||
|
pendingSlices: []*discovery.EndpointSlice{
|
||||||
|
generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testCases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
if tc.useEndpointSlices {
|
||||||
|
for _, slice := range tc.appliedSlices {
|
||||||
|
tc.endpointChangeTracker.EndpointSliceUpdate(slice, false)
|
||||||
|
}
|
||||||
|
tc.endpointChangeTracker.checkoutChanges()
|
||||||
|
for _, slice := range tc.pendingSlices {
|
||||||
|
tc.endpointChangeTracker.EndpointSliceUpdate(slice, false)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tc.endpointChangeTracker.items = tc.items
|
||||||
|
}
|
||||||
|
|
||||||
|
changes := tc.endpointChangeTracker.checkoutChanges()
|
||||||
|
|
||||||
|
if len(tc.expectedChanges) != len(changes) {
|
||||||
|
t.Fatalf("Expected %d changes, got %d", len(tc.expectedChanges), len(changes))
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, change := range changes {
|
||||||
|
expectedChange := tc.expectedChanges[i]
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(change.previous, expectedChange.previous) {
|
||||||
|
t.Errorf("[%d] Expected change.previous: %+v, got: %+v", i, expectedChange.previous, change.previous)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !reflect.DeepEqual(change.current, expectedChange.current) {
|
||||||
|
t.Errorf("[%d] Expected change.current: %+v, got: %+v", i, expectedChange.current, change.current)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1679,3 +1799,18 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newTestEp(ep string) *BaseEndpointInfo {
|
||||||
|
return &BaseEndpointInfo{Endpoint: ep}
|
||||||
|
}
|
||||||
|
|
||||||
|
func initializeCache(endpointSliceCache *EndpointSliceCache, endpointSlices []*discovery.EndpointSlice) {
|
||||||
|
for _, endpointSlice := range endpointSlices {
|
||||||
|
endpointSliceCache.updatePending(endpointSlice, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tracker := range endpointSliceCache.trackerByServiceMap {
|
||||||
|
tracker.applied = tracker.pending
|
||||||
|
tracker.pending = endpointSliceInfoByName{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -18,7 +18,10 @@ package proxy
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
discovery "k8s.io/api/discovery/v1alpha1"
|
discovery "k8s.io/api/discovery/v1alpha1"
|
||||||
@ -31,24 +34,41 @@ import (
|
|||||||
|
|
||||||
// EndpointSliceCache is used as a cache of EndpointSlice information.
|
// EndpointSliceCache is used as a cache of EndpointSlice information.
|
||||||
type EndpointSliceCache struct {
|
type EndpointSliceCache struct {
|
||||||
// sliceByServiceMap is the basis of this cache. It contains endpoint slice
|
// lock protects trackerByServiceMap.
|
||||||
// info grouped by service name and endpoint slice name. The first key
|
lock sync.Mutex
|
||||||
// represents a namespaced service name while the second key represents
|
|
||||||
|
// trackerByServiceMap is the basis of this cache. It contains endpoint
|
||||||
|
// slice trackers grouped by service name and endpoint slice name. The first
|
||||||
|
// key represents a namespaced service name while the second key represents
|
||||||
// an endpoint slice name. Since endpoints can move between slices, we
|
// an endpoint slice name. Since endpoints can move between slices, we
|
||||||
// require slice specific caching to prevent endpoints being removed from
|
// require slice specific caching to prevent endpoints being removed from
|
||||||
// the cache when they may have just moved to a different slice.
|
// the cache when they may have just moved to a different slice.
|
||||||
sliceByServiceMap map[types.NamespacedName]map[string]*endpointSliceInfo
|
trackerByServiceMap map[types.NamespacedName]*endpointSliceTracker
|
||||||
makeEndpointInfo makeEndpointFunc
|
|
||||||
hostname string
|
makeEndpointInfo makeEndpointFunc
|
||||||
isIPv6Mode *bool
|
hostname string
|
||||||
recorder record.EventRecorder
|
isIPv6Mode *bool
|
||||||
|
recorder record.EventRecorder
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// endpointSliceTracker keeps track of EndpointSlices as they have been applied
|
||||||
|
// by a proxier along with any pending EndpointSlices that have been updated
|
||||||
|
// in this cache but not yet applied by a proxier.
|
||||||
|
type endpointSliceTracker struct {
|
||||||
|
applied endpointSliceInfoByName
|
||||||
|
pending endpointSliceInfoByName
|
||||||
|
}
|
||||||
|
|
||||||
|
// endpointSliceInfoByName groups endpointSliceInfo by the names of the
|
||||||
|
// corresponding EndpointSlices.
|
||||||
|
type endpointSliceInfoByName map[string]*endpointSliceInfo
|
||||||
|
|
||||||
// endpointSliceInfo contains just the attributes kube-proxy cares about.
|
// endpointSliceInfo contains just the attributes kube-proxy cares about.
|
||||||
// Used for caching. Intentionally small to limit memory util.
|
// Used for caching. Intentionally small to limit memory util.
|
||||||
type endpointSliceInfo struct {
|
type endpointSliceInfo struct {
|
||||||
Ports []discovery.EndpointPort
|
Ports []discovery.EndpointPort
|
||||||
Endpoints []*endpointInfo
|
Endpoints []*endpointInfo
|
||||||
|
Remove bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// endpointInfo contains just the attributes kube-proxy cares about.
|
// endpointInfo contains just the attributes kube-proxy cares about.
|
||||||
@ -69,69 +89,122 @@ func NewEndpointSliceCache(hostname string, isIPv6Mode *bool, recorder record.Ev
|
|||||||
makeEndpointInfo = standardEndpointInfo
|
makeEndpointInfo = standardEndpointInfo
|
||||||
}
|
}
|
||||||
return &EndpointSliceCache{
|
return &EndpointSliceCache{
|
||||||
sliceByServiceMap: map[types.NamespacedName]map[string]*endpointSliceInfo{},
|
trackerByServiceMap: map[types.NamespacedName]*endpointSliceTracker{},
|
||||||
hostname: hostname,
|
hostname: hostname,
|
||||||
isIPv6Mode: isIPv6Mode,
|
isIPv6Mode: isIPv6Mode,
|
||||||
makeEndpointInfo: makeEndpointInfo,
|
makeEndpointInfo: makeEndpointInfo,
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// newEndpointSliceTracker initializes an endpointSliceTracker.
|
||||||
|
func newEndpointSliceTracker() *endpointSliceTracker {
|
||||||
|
return &endpointSliceTracker{
|
||||||
|
applied: endpointSliceInfoByName{},
|
||||||
|
pending: endpointSliceInfoByName{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newEndpointSliceInfo generates endpointSliceInfo from an EndpointSlice.
|
||||||
|
func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceInfo {
|
||||||
|
esInfo := &endpointSliceInfo{
|
||||||
|
Ports: endpointSlice.Ports,
|
||||||
|
Endpoints: []*endpointInfo{},
|
||||||
|
Remove: remove,
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Sort(byPort(esInfo.Ports))
|
||||||
|
|
||||||
|
if !remove {
|
||||||
|
for _, endpoint := range endpointSlice.Endpoints {
|
||||||
|
if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready == true {
|
||||||
|
esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{
|
||||||
|
Addresses: endpoint.Addresses,
|
||||||
|
Topology: endpoint.Topology,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sort.Sort(byAddress(esInfo.Endpoints))
|
||||||
|
}
|
||||||
|
|
||||||
|
return esInfo
|
||||||
|
}
|
||||||
|
|
||||||
// standardEndpointInfo is the default makeEndpointFunc.
|
// standardEndpointInfo is the default makeEndpointFunc.
|
||||||
func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint {
|
func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint {
|
||||||
return ep
|
return ep
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update a slice in the cache.
|
// updatePending updates a pending slice in the cache.
|
||||||
func (cache *EndpointSliceCache) Update(endpointSlice *discovery.EndpointSlice) {
|
func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.EndpointSlice, remove bool) bool {
|
||||||
serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice)
|
serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Warningf("Error getting endpoint slice cache keys: %v", err)
|
klog.Warningf("Error getting endpoint slice cache keys: %v", err)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
esInfo := &endpointSliceInfo{
|
esInfo := newEndpointSliceInfo(endpointSlice, remove)
|
||||||
Ports: endpointSlice.Ports,
|
|
||||||
Endpoints: []*endpointInfo{},
|
cache.lock.Lock()
|
||||||
|
defer cache.lock.Unlock()
|
||||||
|
|
||||||
|
if _, ok := cache.trackerByServiceMap[serviceKey]; !ok {
|
||||||
|
cache.trackerByServiceMap[serviceKey] = newEndpointSliceTracker()
|
||||||
}
|
}
|
||||||
for _, endpoint := range endpointSlice.Endpoints {
|
|
||||||
if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready == true {
|
changed := cache.esInfoChanged(serviceKey, sliceKey, esInfo)
|
||||||
esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{
|
|
||||||
Addresses: endpoint.Addresses,
|
if changed {
|
||||||
Topology: endpoint.Topology,
|
cache.trackerByServiceMap[serviceKey].pending[sliceKey] = esInfo
|
||||||
})
|
}
|
||||||
|
|
||||||
|
return changed
|
||||||
|
}
|
||||||
|
|
||||||
|
// checkoutChanges returns a list of all endpointsChanges that are
|
||||||
|
// pending and then marks them as applied.
|
||||||
|
func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange {
|
||||||
|
changes := []*endpointsChange{}
|
||||||
|
|
||||||
|
cache.lock.Lock()
|
||||||
|
defer cache.lock.Unlock()
|
||||||
|
|
||||||
|
for serviceNN, esTracker := range cache.trackerByServiceMap {
|
||||||
|
if len(esTracker.pending) == 0 {
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
change := &endpointsChange{}
|
||||||
|
|
||||||
|
change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied)
|
||||||
|
|
||||||
|
for name, sliceInfo := range esTracker.pending {
|
||||||
|
if sliceInfo.Remove {
|
||||||
|
delete(esTracker.applied, name)
|
||||||
|
} else {
|
||||||
|
esTracker.applied[name] = sliceInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(esTracker.pending, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
change.current = cache.getEndpointsMap(serviceNN, esTracker.applied)
|
||||||
|
changes = append(changes, change)
|
||||||
}
|
}
|
||||||
if _, exists := cache.sliceByServiceMap[serviceKey]; !exists {
|
|
||||||
cache.sliceByServiceMap[serviceKey] = map[string]*endpointSliceInfo{}
|
return changes
|
||||||
}
|
|
||||||
cache.sliceByServiceMap[serviceKey][sliceKey] = esInfo
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete a slice from the cache.
|
// getEndpointsMap computes an EndpointsMap for a given set of EndpointSlices.
|
||||||
func (cache *EndpointSliceCache) Delete(endpointSlice *discovery.EndpointSlice) {
|
func (cache *EndpointSliceCache) getEndpointsMap(serviceNN types.NamespacedName, sliceInfoByName endpointSliceInfoByName) EndpointsMap {
|
||||||
serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice)
|
endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN, sliceInfoByName)
|
||||||
if err != nil {
|
|
||||||
klog.Warningf("Error getting endpoint slice cache keys: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
delete(cache.sliceByServiceMap[serviceKey], sliceKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
// EndpointsMap computes an EndpointsMap for a given service.
|
|
||||||
func (cache *EndpointSliceCache) EndpointsMap(serviceNN types.NamespacedName) EndpointsMap {
|
|
||||||
endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN)
|
|
||||||
return endpointsMapFromEndpointInfo(endpointInfoBySP)
|
return endpointsMapFromEndpointInfo(endpointInfoBySP)
|
||||||
}
|
}
|
||||||
|
|
||||||
// endpointInfoByServicePort groups endpoint info by service port name and address.
|
// endpointInfoByServicePort groups endpoint info by service port name and address.
|
||||||
func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName) spToEndpointMap {
|
func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName, sliceInfoByName endpointSliceInfoByName) spToEndpointMap {
|
||||||
endpointInfoBySP := spToEndpointMap{}
|
endpointInfoBySP := spToEndpointMap{}
|
||||||
sliceInfoByName, ok := cache.sliceByServiceMap[serviceNN]
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
return endpointInfoBySP
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, sliceInfo := range sliceInfoByName {
|
for _, sliceInfo := range sliceInfoByName {
|
||||||
for _, port := range sliceInfo.Ports {
|
for _, port := range sliceInfo.Ports {
|
||||||
@ -198,6 +271,36 @@ func (cache *EndpointSliceCache) isLocal(hostname string) bool {
|
|||||||
return len(cache.hostname) > 0 && hostname == cache.hostname
|
return len(cache.hostname) > 0 && hostname == cache.hostname
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// esInfoChanged returns true if the esInfo parameter should be set as a new
|
||||||
|
// pending value in the cache.
|
||||||
|
func (cache *EndpointSliceCache) esInfoChanged(serviceKey types.NamespacedName, sliceKey string, esInfo *endpointSliceInfo) bool {
|
||||||
|
if _, ok := cache.trackerByServiceMap[serviceKey]; ok {
|
||||||
|
appliedInfo, appliedOk := cache.trackerByServiceMap[serviceKey].applied[sliceKey]
|
||||||
|
pendingInfo, pendingOk := cache.trackerByServiceMap[serviceKey].pending[sliceKey]
|
||||||
|
|
||||||
|
// If there's already a pending value, return whether or not this would
|
||||||
|
// change that.
|
||||||
|
if pendingOk {
|
||||||
|
return !reflect.DeepEqual(esInfo, pendingInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If there's already an applied value, return whether or not this would
|
||||||
|
// change that.
|
||||||
|
if appliedOk {
|
||||||
|
return !reflect.DeepEqual(esInfo, appliedInfo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this is marked for removal and does not exist in the cache, no changes
|
||||||
|
// are necessary.
|
||||||
|
if esInfo.Remove {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// If not in the cache, and not marked for removal, it should be added.
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
// endpointsMapFromEndpointInfo computes an endpointsMap from endpointInfo that
|
// endpointsMapFromEndpointInfo computes an endpointsMap from endpointInfo that
|
||||||
// has been grouped by service port and IP.
|
// has been grouped by service port and IP.
|
||||||
func endpointsMapFromEndpointInfo(endpointInfoBySP map[ServicePortName]map[string]Endpoint) EndpointsMap {
|
func endpointsMapFromEndpointInfo(endpointInfoBySP map[ServicePortName]map[string]Endpoint) EndpointsMap {
|
||||||
@ -242,6 +345,19 @@ func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.Names
|
|||||||
return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err
|
return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// byAddress helps sort endpointInfo
|
||||||
|
type byAddress []*endpointInfo
|
||||||
|
|
||||||
|
func (e byAddress) Len() int {
|
||||||
|
return len(e)
|
||||||
|
}
|
||||||
|
func (e byAddress) Swap(i, j int) {
|
||||||
|
e[i], e[j] = e[j], e[i]
|
||||||
|
}
|
||||||
|
func (e byAddress) Less(i, j int) bool {
|
||||||
|
return strings.Join(e[i].Addresses, ",") < strings.Join(e[j].Addresses, ",")
|
||||||
|
}
|
||||||
|
|
||||||
// byIP helps sort endpoints by IP
|
// byIP helps sort endpoints by IP
|
||||||
type byIP []Endpoint
|
type byIP []Endpoint
|
||||||
|
|
||||||
@ -254,3 +370,16 @@ func (e byIP) Swap(i, j int) {
|
|||||||
func (e byIP) Less(i, j int) bool {
|
func (e byIP) Less(i, j int) bool {
|
||||||
return e[i].String() < e[j].String()
|
return e[i].String() < e[j].String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// byPort helps sort EndpointSlice ports by port number
|
||||||
|
type byPort []discovery.EndpointPort
|
||||||
|
|
||||||
|
func (p byPort) Len() int {
|
||||||
|
return len(p)
|
||||||
|
}
|
||||||
|
func (p byPort) Swap(i, j int) {
|
||||||
|
p[i], p[j] = p[j], p[i]
|
||||||
|
}
|
||||||
|
func (p byPort) Less(i, j int) bool {
|
||||||
|
return *p[i].Port < *p[j].Port
|
||||||
|
}
|
||||||
|
@ -153,10 +153,10 @@ func TestEndpointsMapFromESC(t *testing.T) {
|
|||||||
esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
|
esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
|
||||||
|
|
||||||
for _, endpointSlice := range tc.endpointSlices {
|
for _, endpointSlice := range tc.endpointSlices {
|
||||||
esCache.Update(endpointSlice)
|
esCache.updatePending(endpointSlice, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
compareEndpointsMapsStr(t, esCache.EndpointsMap(tc.namespacedName), tc.expectedMap)
|
compareEndpointsMapsStr(t, esCache.getEndpointsMap(tc.namespacedName, esCache.trackerByServiceMap[tc.namespacedName].pending), tc.expectedMap)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -185,17 +185,153 @@ func TestEndpointInfoByServicePort(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for name, tc := range testCases {
|
for name, tc := range testCases {
|
||||||
esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
|
t.Run(name, func(t *testing.T) {
|
||||||
|
esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil)
|
||||||
|
|
||||||
for _, endpointSlice := range tc.endpointSlices {
|
for _, endpointSlice := range tc.endpointSlices {
|
||||||
esCache.Update(endpointSlice)
|
esCache.updatePending(endpointSlice, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
got := esCache.endpointInfoByServicePort(tc.namespacedName)
|
got := esCache.endpointInfoByServicePort(tc.namespacedName, esCache.trackerByServiceMap[tc.namespacedName].pending)
|
||||||
if !reflect.DeepEqual(got, tc.expectedMap) {
|
if !reflect.DeepEqual(got, tc.expectedMap) {
|
||||||
t.Errorf("[%s] endpointInfoByServicePort does not match. Want: %+v, Got: %+v", name, tc.expectedMap, got)
|
t.Errorf("endpointInfoByServicePort does not match. Want: %+v, Got: %+v", tc.expectedMap, got)
|
||||||
}
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEsInfoChanged(t *testing.T) {
|
||||||
|
p80 := int32(80)
|
||||||
|
p443 := int32(443)
|
||||||
|
tcpProto := v1.ProtocolTCP
|
||||||
|
port80 := discovery.EndpointPort{Port: &p80, Name: utilpointer.StringPtr("http"), Protocol: &tcpProto}
|
||||||
|
port443 := discovery.EndpointPort{Port: &p443, Name: utilpointer.StringPtr("https"), Protocol: &tcpProto}
|
||||||
|
endpoint1 := discovery.Endpoint{Addresses: []string{"10.0.1.0"}}
|
||||||
|
endpoint2 := discovery.Endpoint{Addresses: []string{"10.0.1.1"}}
|
||||||
|
|
||||||
|
objMeta := metav1.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: "bar",
|
||||||
|
Labels: map[string]string{discovery.LabelServiceName: "svc1"},
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := map[string]struct {
|
||||||
|
cache *EndpointSliceCache
|
||||||
|
initialSlice *discovery.EndpointSlice
|
||||||
|
updatedSlice *discovery.EndpointSlice
|
||||||
|
expectChanged bool
|
||||||
|
}{
|
||||||
|
"identical slices, ports only": {
|
||||||
|
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||||
|
initialSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port80},
|
||||||
|
},
|
||||||
|
updatedSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port80},
|
||||||
|
},
|
||||||
|
expectChanged: false,
|
||||||
|
},
|
||||||
|
"identical slices, ports out of order": {
|
||||||
|
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||||
|
initialSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port443, port80},
|
||||||
|
},
|
||||||
|
updatedSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port80, port443},
|
||||||
|
},
|
||||||
|
expectChanged: false,
|
||||||
|
},
|
||||||
|
"port removed": {
|
||||||
|
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||||
|
initialSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port443, port80},
|
||||||
|
},
|
||||||
|
updatedSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port443},
|
||||||
|
},
|
||||||
|
expectChanged: true,
|
||||||
|
},
|
||||||
|
"port added": {
|
||||||
|
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||||
|
initialSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port443},
|
||||||
|
},
|
||||||
|
updatedSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port443, port80},
|
||||||
|
},
|
||||||
|
expectChanged: true,
|
||||||
|
},
|
||||||
|
"identical with endpoints": {
|
||||||
|
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||||
|
initialSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port443},
|
||||||
|
Endpoints: []discovery.Endpoint{endpoint1, endpoint2},
|
||||||
|
},
|
||||||
|
updatedSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port443},
|
||||||
|
Endpoints: []discovery.Endpoint{endpoint1, endpoint2},
|
||||||
|
},
|
||||||
|
expectChanged: false,
|
||||||
|
},
|
||||||
|
"identical with endpoints out of order": {
|
||||||
|
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||||
|
initialSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port443},
|
||||||
|
Endpoints: []discovery.Endpoint{endpoint1, endpoint2},
|
||||||
|
},
|
||||||
|
updatedSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port443},
|
||||||
|
Endpoints: []discovery.Endpoint{endpoint2, endpoint1},
|
||||||
|
},
|
||||||
|
expectChanged: false,
|
||||||
|
},
|
||||||
|
"identical with endpoint added": {
|
||||||
|
cache: NewEndpointSliceCache("", nil, nil, nil),
|
||||||
|
initialSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port443},
|
||||||
|
Endpoints: []discovery.Endpoint{endpoint1},
|
||||||
|
},
|
||||||
|
updatedSlice: &discovery.EndpointSlice{
|
||||||
|
ObjectMeta: objMeta,
|
||||||
|
Ports: []discovery.EndpointPort{port443},
|
||||||
|
Endpoints: []discovery.Endpoint{endpoint2, endpoint1},
|
||||||
|
},
|
||||||
|
expectChanged: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, tc := range testCases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
if tc.initialSlice != nil {
|
||||||
|
tc.cache.updatePending(tc.initialSlice, false)
|
||||||
|
tc.cache.checkoutChanges()
|
||||||
|
}
|
||||||
|
|
||||||
|
serviceKey, sliceKey, err := endpointSliceCacheKeys(tc.updatedSlice)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Expected no error calling endpointSliceCacheKeys(): %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
esInfo := newEndpointSliceInfo(tc.updatedSlice, false)
|
||||||
|
changed := tc.cache.esInfoChanged(serviceKey, sliceKey, esInfo)
|
||||||
|
|
||||||
|
if tc.expectChanged != changed {
|
||||||
|
t.Errorf("Expected esInfoChanged() to return %t, got %t", tc.expectChanged, changed)
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user