diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index aa673198f59..7916c88b215 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -92,7 +92,7 @@ type EndpointChangeTracker struct { items map[types.NamespacedName]*endpointsChange // makeEndpointInfo allows proxier to inject customized information when processing endpoint. makeEndpointInfo makeEndpointFunc - // endpointSliceCache holds a simplified version of endpoint slices + // endpointSliceCache holds a simplified version of endpoint slices. endpointSliceCache *EndpointSliceCache // isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable. isIPv6Mode *bool @@ -190,39 +190,54 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E ect.lock.Lock() defer ect.lock.Unlock() - change, ok := ect.items[namespacedName] - if !ok { - change = &endpointsChange{} - change.previous = ect.endpointSliceCache.EndpointsMap(namespacedName) - ect.items[namespacedName] = change + changeNeeded := ect.endpointSliceCache.updatePending(endpointSlice, removeSlice) + + if changeNeeded { + metrics.EndpointChangesPending.Inc() + if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() { + ect.lastChangeTriggerTimes[namespacedName] = + append(ect.lastChangeTriggerTimes[namespacedName], t) + } } - if removeSlice { - ect.endpointSliceCache.Delete(endpointSlice) - } else { - ect.endpointSliceCache.Update(endpointSlice) + return changeNeeded +} + +// checkoutChanges returns a list of pending endpointsChanges and marks them as +// applied. +func (ect *EndpointChangeTracker) checkoutChanges() []*endpointsChange { + ect.lock.Lock() + defer ect.lock.Unlock() + + metrics.EndpointChangesPending.Set(0) + + if ect.endpointSliceCache != nil { + return ect.endpointSliceCache.checkoutChanges() } - if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() { - ect.lastChangeTriggerTimes[namespacedName] = - append(ect.lastChangeTriggerTimes[namespacedName], t) + changes := []*endpointsChange{} + for _, change := range ect.items { + changes = append(changes, change) } + ect.items = make(map[types.NamespacedName]*endpointsChange) + return changes +} - change.current = ect.endpointSliceCache.EndpointsMap(namespacedName) - // if change.previous equal to change.current, it means no change - if reflect.DeepEqual(change.previous, change.current) { - delete(ect.items, namespacedName) - // Reset the lastChangeTriggerTimes for this service. Given that the network programming - // SLI is defined as the duration between a time of an event and a time when the network was - // programmed to incorporate that event, if there are events that happened between two - // consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted, - // there will be no network programming for them and thus no network programming latency metric - // should be exported. - delete(ect.lastChangeTriggerTimes, namespacedName) +// checkoutTriggerTimes applies the locally cached trigger times to a map of +// trigger times that have been passed in and empties the local cache. +func (ect *EndpointChangeTracker) checkoutTriggerTimes(lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) { + ect.lock.Lock() + defer ect.lock.Unlock() + + for k, v := range ect.lastChangeTriggerTimes { + prev, ok := (*lastChangeTriggerTimes)[k] + if !ok { + (*lastChangeTriggerTimes)[k] = v + } else { + (*lastChangeTriggerTimes)[k] = append(prev, v...) + } } - - metrics.EndpointChangesPending.Set(float64(len(ect.items))) - return len(ect.items) > 0 + ect.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time) } // getLastChangeTriggerTime returns the time.Time value of the @@ -351,29 +366,19 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint // The changes map is cleared after applying them. // In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints // that were changed and will result in syncing the proxy rules. -func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, +func (em EndpointsMap) apply(ect *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) { - if changes == nil { + if ect == nil { return } - changes.lock.Lock() - defer changes.lock.Unlock() - for _, change := range changes.items { + + changes := ect.checkoutChanges() + for _, change := range changes { em.unmerge(change.previous) em.merge(change.current) detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames) } - changes.items = make(map[types.NamespacedName]*endpointsChange) - metrics.EndpointChangesPending.Set(0) - for k, v := range changes.lastChangeTriggerTimes { - prev, ok := (*lastChangeTriggerTimes)[k] - if !ok { - (*lastChangeTriggerTimes)[k] = v - } else { - (*lastChangeTriggerTimes)[k] = append(prev, v...) - } - } - changes.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time) + ect.checkoutTriggerTimes(lastChangeTriggerTimes) } // Merge ensures that the current EndpointsMap contains all pairs from the EndpointsMap passed in. diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 0426bb650c7..db35ca83f5e 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -1630,27 +1630,147 @@ func TestEndpointSliceUpdate(t *testing.T) { } for name, tc := range testCases { - for _, startingSlice := range tc.startingSlices { - tc.endpointChangeTracker.endpointSliceCache.Update(startingSlice) - } + t.Run(name, func(t *testing.T) { + initializeCache(tc.endpointChangeTracker.endpointSliceCache, tc.startingSlices) - got := tc.endpointChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice) - if !reflect.DeepEqual(got, tc.expectedReturnVal) { - t.Errorf("[%s] EndpointSliceUpdate return value got: %v, want %v", name, got, tc.expectedReturnVal) - } - if tc.endpointChangeTracker.items == nil { - t.Errorf("[%s] Expected ect.items to not be nil", name) - } - if tc.expectedCurrentChange == nil { - if tc.endpointChangeTracker.items[tc.namespacedName] != nil { - t.Errorf("[%s] Expected ect.items[%s] to be nil", name, tc.namespacedName) + got := tc.endpointChangeTracker.EndpointSliceUpdate(tc.paramEndpointSlice, tc.paramRemoveSlice) + if !reflect.DeepEqual(got, tc.expectedReturnVal) { + t.Errorf("EndpointSliceUpdate return value got: %v, want %v", got, tc.expectedReturnVal) } - } else { - if tc.endpointChangeTracker.items[tc.namespacedName] == nil { - t.Errorf("[%s] Expected ect.items[%s] to not be nil", name, tc.namespacedName) + if tc.endpointChangeTracker.items == nil { + t.Errorf("Expected ect.items to not be nil") } - compareEndpointsMapsStr(t, tc.endpointChangeTracker.items[tc.namespacedName].current, tc.expectedCurrentChange) - } + changes := tc.endpointChangeTracker.checkoutChanges() + if tc.expectedCurrentChange == nil { + if len(changes) != 0 { + t.Errorf("Expected %s to have no changes", tc.namespacedName) + } + } else { + if len(changes) == 0 || changes[0] == nil { + t.Fatalf("Expected %s to have changes", tc.namespacedName) + } + compareEndpointsMapsStr(t, changes[0].current, tc.expectedCurrentChange) + } + }) + } +} + +func TestCheckoutChanges(t *testing.T) { + svcPortName0 := ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, "port-0", v1.ProtocolTCP} + svcPortName1 := ServicePortName{types.NamespacedName{Namespace: "ns1", Name: "svc1"}, "port-1", v1.ProtocolTCP} + + testCases := map[string]struct { + endpointChangeTracker *EndpointChangeTracker + expectedChanges []*endpointsChange + useEndpointSlices bool + items map[types.NamespacedName]*endpointsChange + appliedSlices []*discovery.EndpointSlice + pendingSlices []*discovery.EndpointSlice + }{ + "empty slices": { + endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true), + expectedChanges: []*endpointsChange{}, + useEndpointSlices: true, + appliedSlices: []*discovery.EndpointSlice{}, + pendingSlices: []*discovery.EndpointSlice{}, + }, + "without slices, empty items": { + endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false), + expectedChanges: []*endpointsChange{}, + items: map[types.NamespacedName]*endpointsChange{}, + useEndpointSlices: false, + }, + "without slices, simple items": { + endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, false), + expectedChanges: []*endpointsChange{{ + previous: EndpointsMap{ + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, + svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")}, + }, + current: EndpointsMap{ + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, + }, + }}, + items: map[types.NamespacedName]*endpointsChange{ + {Namespace: "ns1", Name: "svc1"}: { + previous: EndpointsMap{ + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, + svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")}, + }, + current: EndpointsMap{ + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, + }, + }, + }, + useEndpointSlices: false, + }, + "adding initial slice": { + endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true), + expectedChanges: []*endpointsChange{{ + previous: EndpointsMap{}, + current: EndpointsMap{ + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, + }, + }}, + useEndpointSlices: true, + appliedSlices: []*discovery.EndpointSlice{}, + pendingSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}), + }, + }, + "removing port in update": { + endpointChangeTracker: NewEndpointChangeTracker("", nil, nil, nil, true), + expectedChanges: []*endpointsChange{{ + previous: EndpointsMap{ + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, + svcPortName1: []Endpoint{newTestEp("10.0.1.1:443"), newTestEp("10.0.1.2:443")}, + }, + current: EndpointsMap{ + svcPortName0: []Endpoint{newTestEp("10.0.1.1:80"), newTestEp("10.0.1.2:80")}, + }, + }}, + useEndpointSlices: true, + appliedSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80), utilpointer.Int32Ptr(443)}), + }, + pendingSlices: []*discovery.EndpointSlice{ + generateEndpointSlice("svc1", "ns1", 1, 3, 3, []string{"host1"}, []*int32{utilpointer.Int32Ptr(80)}), + }, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + if tc.useEndpointSlices { + for _, slice := range tc.appliedSlices { + tc.endpointChangeTracker.EndpointSliceUpdate(slice, false) + } + tc.endpointChangeTracker.checkoutChanges() + for _, slice := range tc.pendingSlices { + tc.endpointChangeTracker.EndpointSliceUpdate(slice, false) + } + } else { + tc.endpointChangeTracker.items = tc.items + } + + changes := tc.endpointChangeTracker.checkoutChanges() + + if len(tc.expectedChanges) != len(changes) { + t.Fatalf("Expected %d changes, got %d", len(tc.expectedChanges), len(changes)) + } + + for i, change := range changes { + expectedChange := tc.expectedChanges[i] + + if !reflect.DeepEqual(change.previous, expectedChange.previous) { + t.Errorf("[%d] Expected change.previous: %+v, got: %+v", i, expectedChange.previous, change.previous) + } + + if !reflect.DeepEqual(change.current, expectedChange.current) { + t.Errorf("[%d] Expected change.current: %+v, got: %+v", i, expectedChange.current, change.current) + } + } + }) } } @@ -1679,3 +1799,18 @@ func compareEndpointsMapsStr(t *testing.T, newMap EndpointsMap, expected map[Ser } } } + +func newTestEp(ep string) *BaseEndpointInfo { + return &BaseEndpointInfo{Endpoint: ep} +} + +func initializeCache(endpointSliceCache *EndpointSliceCache, endpointSlices []*discovery.EndpointSlice) { + for _, endpointSlice := range endpointSlices { + endpointSliceCache.updatePending(endpointSlice, false) + } + + for _, tracker := range endpointSliceCache.trackerByServiceMap { + tracker.applied = tracker.pending + tracker.pending = endpointSliceInfoByName{} + } +} diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index 298b30d748a..07dc637757b 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -18,7 +18,10 @@ package proxy import ( "fmt" + "reflect" "sort" + "strings" + "sync" "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1alpha1" @@ -31,24 +34,41 @@ import ( // EndpointSliceCache is used as a cache of EndpointSlice information. type EndpointSliceCache struct { - // sliceByServiceMap is the basis of this cache. It contains endpoint slice - // info grouped by service name and endpoint slice name. The first key - // represents a namespaced service name while the second key represents + // lock protects trackerByServiceMap. + lock sync.Mutex + + // trackerByServiceMap is the basis of this cache. It contains endpoint + // slice trackers grouped by service name and endpoint slice name. The first + // key represents a namespaced service name while the second key represents // an endpoint slice name. Since endpoints can move between slices, we // require slice specific caching to prevent endpoints being removed from // the cache when they may have just moved to a different slice. - sliceByServiceMap map[types.NamespacedName]map[string]*endpointSliceInfo - makeEndpointInfo makeEndpointFunc - hostname string - isIPv6Mode *bool - recorder record.EventRecorder + trackerByServiceMap map[types.NamespacedName]*endpointSliceTracker + + makeEndpointInfo makeEndpointFunc + hostname string + isIPv6Mode *bool + recorder record.EventRecorder } +// endpointSliceTracker keeps track of EndpointSlices as they have been applied +// by a proxier along with any pending EndpointSlices that have been updated +// in this cache but not yet applied by a proxier. +type endpointSliceTracker struct { + applied endpointSliceInfoByName + pending endpointSliceInfoByName +} + +// endpointSliceInfoByName groups endpointSliceInfo by the names of the +// corresponding EndpointSlices. +type endpointSliceInfoByName map[string]*endpointSliceInfo + // endpointSliceInfo contains just the attributes kube-proxy cares about. // Used for caching. Intentionally small to limit memory util. type endpointSliceInfo struct { Ports []discovery.EndpointPort Endpoints []*endpointInfo + Remove bool } // endpointInfo contains just the attributes kube-proxy cares about. @@ -69,69 +89,122 @@ func NewEndpointSliceCache(hostname string, isIPv6Mode *bool, recorder record.Ev makeEndpointInfo = standardEndpointInfo } return &EndpointSliceCache{ - sliceByServiceMap: map[types.NamespacedName]map[string]*endpointSliceInfo{}, - hostname: hostname, - isIPv6Mode: isIPv6Mode, - makeEndpointInfo: makeEndpointInfo, - recorder: recorder, + trackerByServiceMap: map[types.NamespacedName]*endpointSliceTracker{}, + hostname: hostname, + isIPv6Mode: isIPv6Mode, + makeEndpointInfo: makeEndpointInfo, + recorder: recorder, } } +// newEndpointSliceTracker initializes an endpointSliceTracker. +func newEndpointSliceTracker() *endpointSliceTracker { + return &endpointSliceTracker{ + applied: endpointSliceInfoByName{}, + pending: endpointSliceInfoByName{}, + } +} + +// newEndpointSliceInfo generates endpointSliceInfo from an EndpointSlice. +func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *endpointSliceInfo { + esInfo := &endpointSliceInfo{ + Ports: endpointSlice.Ports, + Endpoints: []*endpointInfo{}, + Remove: remove, + } + + sort.Sort(byPort(esInfo.Ports)) + + if !remove { + for _, endpoint := range endpointSlice.Endpoints { + if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready == true { + esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{ + Addresses: endpoint.Addresses, + Topology: endpoint.Topology, + }) + } + } + + sort.Sort(byAddress(esInfo.Endpoints)) + } + + return esInfo +} + // standardEndpointInfo is the default makeEndpointFunc. func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint { return ep } -// Update a slice in the cache. -func (cache *EndpointSliceCache) Update(endpointSlice *discovery.EndpointSlice) { +// updatePending updates a pending slice in the cache. +func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.EndpointSlice, remove bool) bool { serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice) if err != nil { klog.Warningf("Error getting endpoint slice cache keys: %v", err) - return + return false } - esInfo := &endpointSliceInfo{ - Ports: endpointSlice.Ports, - Endpoints: []*endpointInfo{}, + esInfo := newEndpointSliceInfo(endpointSlice, remove) + + cache.lock.Lock() + defer cache.lock.Unlock() + + if _, ok := cache.trackerByServiceMap[serviceKey]; !ok { + cache.trackerByServiceMap[serviceKey] = newEndpointSliceTracker() } - for _, endpoint := range endpointSlice.Endpoints { - if endpoint.Conditions.Ready == nil || *endpoint.Conditions.Ready == true { - esInfo.Endpoints = append(esInfo.Endpoints, &endpointInfo{ - Addresses: endpoint.Addresses, - Topology: endpoint.Topology, - }) + + changed := cache.esInfoChanged(serviceKey, sliceKey, esInfo) + + if changed { + cache.trackerByServiceMap[serviceKey].pending[sliceKey] = esInfo + } + + return changed +} + +// checkoutChanges returns a list of all endpointsChanges that are +// pending and then marks them as applied. +func (cache *EndpointSliceCache) checkoutChanges() []*endpointsChange { + changes := []*endpointsChange{} + + cache.lock.Lock() + defer cache.lock.Unlock() + + for serviceNN, esTracker := range cache.trackerByServiceMap { + if len(esTracker.pending) == 0 { + continue } + + change := &endpointsChange{} + + change.previous = cache.getEndpointsMap(serviceNN, esTracker.applied) + + for name, sliceInfo := range esTracker.pending { + if sliceInfo.Remove { + delete(esTracker.applied, name) + } else { + esTracker.applied[name] = sliceInfo + } + + delete(esTracker.pending, name) + } + + change.current = cache.getEndpointsMap(serviceNN, esTracker.applied) + changes = append(changes, change) } - if _, exists := cache.sliceByServiceMap[serviceKey]; !exists { - cache.sliceByServiceMap[serviceKey] = map[string]*endpointSliceInfo{} - } - cache.sliceByServiceMap[serviceKey][sliceKey] = esInfo + + return changes } -// Delete a slice from the cache. -func (cache *EndpointSliceCache) Delete(endpointSlice *discovery.EndpointSlice) { - serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice) - if err != nil { - klog.Warningf("Error getting endpoint slice cache keys: %v", err) - return - } - delete(cache.sliceByServiceMap[serviceKey], sliceKey) -} - -// EndpointsMap computes an EndpointsMap for a given service. -func (cache *EndpointSliceCache) EndpointsMap(serviceNN types.NamespacedName) EndpointsMap { - endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN) +// getEndpointsMap computes an EndpointsMap for a given set of EndpointSlices. +func (cache *EndpointSliceCache) getEndpointsMap(serviceNN types.NamespacedName, sliceInfoByName endpointSliceInfoByName) EndpointsMap { + endpointInfoBySP := cache.endpointInfoByServicePort(serviceNN, sliceInfoByName) return endpointsMapFromEndpointInfo(endpointInfoBySP) } // endpointInfoByServicePort groups endpoint info by service port name and address. -func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName) spToEndpointMap { +func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.NamespacedName, sliceInfoByName endpointSliceInfoByName) spToEndpointMap { endpointInfoBySP := spToEndpointMap{} - sliceInfoByName, ok := cache.sliceByServiceMap[serviceNN] - - if !ok { - return endpointInfoBySP - } for _, sliceInfo := range sliceInfoByName { for _, port := range sliceInfo.Ports { @@ -198,6 +271,36 @@ func (cache *EndpointSliceCache) isLocal(hostname string) bool { return len(cache.hostname) > 0 && hostname == cache.hostname } +// esInfoChanged returns true if the esInfo parameter should be set as a new +// pending value in the cache. +func (cache *EndpointSliceCache) esInfoChanged(serviceKey types.NamespacedName, sliceKey string, esInfo *endpointSliceInfo) bool { + if _, ok := cache.trackerByServiceMap[serviceKey]; ok { + appliedInfo, appliedOk := cache.trackerByServiceMap[serviceKey].applied[sliceKey] + pendingInfo, pendingOk := cache.trackerByServiceMap[serviceKey].pending[sliceKey] + + // If there's already a pending value, return whether or not this would + // change that. + if pendingOk { + return !reflect.DeepEqual(esInfo, pendingInfo) + } + + // If there's already an applied value, return whether or not this would + // change that. + if appliedOk { + return !reflect.DeepEqual(esInfo, appliedInfo) + } + } + + // If this is marked for removal and does not exist in the cache, no changes + // are necessary. + if esInfo.Remove { + return false + } + + // If not in the cache, and not marked for removal, it should be added. + return true +} + // endpointsMapFromEndpointInfo computes an endpointsMap from endpointInfo that // has been grouped by service port and IP. func endpointsMapFromEndpointInfo(endpointInfoBySP map[ServicePortName]map[string]Endpoint) EndpointsMap { @@ -242,6 +345,19 @@ func endpointSliceCacheKeys(endpointSlice *discovery.EndpointSlice) (types.Names return types.NamespacedName{Namespace: endpointSlice.Namespace, Name: serviceName}, endpointSlice.Name, err } +// byAddress helps sort endpointInfo +type byAddress []*endpointInfo + +func (e byAddress) Len() int { + return len(e) +} +func (e byAddress) Swap(i, j int) { + e[i], e[j] = e[j], e[i] +} +func (e byAddress) Less(i, j int) bool { + return strings.Join(e[i].Addresses, ",") < strings.Join(e[j].Addresses, ",") +} + // byIP helps sort endpoints by IP type byIP []Endpoint @@ -254,3 +370,16 @@ func (e byIP) Swap(i, j int) { func (e byIP) Less(i, j int) bool { return e[i].String() < e[j].String() } + +// byPort helps sort EndpointSlice ports by port number +type byPort []discovery.EndpointPort + +func (p byPort) Len() int { + return len(p) +} +func (p byPort) Swap(i, j int) { + p[i], p[j] = p[j], p[i] +} +func (p byPort) Less(i, j int) bool { + return *p[i].Port < *p[j].Port +} diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index ac5268bc0f8..655eab08c69 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -153,10 +153,10 @@ func TestEndpointsMapFromESC(t *testing.T) { esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil) for _, endpointSlice := range tc.endpointSlices { - esCache.Update(endpointSlice) + esCache.updatePending(endpointSlice, false) } - compareEndpointsMapsStr(t, esCache.EndpointsMap(tc.namespacedName), tc.expectedMap) + compareEndpointsMapsStr(t, esCache.getEndpointsMap(tc.namespacedName, esCache.trackerByServiceMap[tc.namespacedName].pending), tc.expectedMap) }) } } @@ -185,17 +185,153 @@ func TestEndpointInfoByServicePort(t *testing.T) { } for name, tc := range testCases { - esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil) + t.Run(name, func(t *testing.T) { + esCache := NewEndpointSliceCache(tc.hostname, nil, nil, nil) - for _, endpointSlice := range tc.endpointSlices { - esCache.Update(endpointSlice) - } + for _, endpointSlice := range tc.endpointSlices { + esCache.updatePending(endpointSlice, false) + } - got := esCache.endpointInfoByServicePort(tc.namespacedName) - if !reflect.DeepEqual(got, tc.expectedMap) { - t.Errorf("[%s] endpointInfoByServicePort does not match. Want: %+v, Got: %+v", name, tc.expectedMap, got) - } + got := esCache.endpointInfoByServicePort(tc.namespacedName, esCache.trackerByServiceMap[tc.namespacedName].pending) + if !reflect.DeepEqual(got, tc.expectedMap) { + t.Errorf("endpointInfoByServicePort does not match. Want: %+v, Got: %+v", tc.expectedMap, got) + } + }) + } +} +func TestEsInfoChanged(t *testing.T) { + p80 := int32(80) + p443 := int32(443) + tcpProto := v1.ProtocolTCP + port80 := discovery.EndpointPort{Port: &p80, Name: utilpointer.StringPtr("http"), Protocol: &tcpProto} + port443 := discovery.EndpointPort{Port: &p443, Name: utilpointer.StringPtr("https"), Protocol: &tcpProto} + endpoint1 := discovery.Endpoint{Addresses: []string{"10.0.1.0"}} + endpoint2 := discovery.Endpoint{Addresses: []string{"10.0.1.1"}} + + objMeta := metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + Labels: map[string]string{discovery.LabelServiceName: "svc1"}, + } + + testCases := map[string]struct { + cache *EndpointSliceCache + initialSlice *discovery.EndpointSlice + updatedSlice *discovery.EndpointSlice + expectChanged bool + }{ + "identical slices, ports only": { + cache: NewEndpointSliceCache("", nil, nil, nil), + initialSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port80}, + }, + updatedSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port80}, + }, + expectChanged: false, + }, + "identical slices, ports out of order": { + cache: NewEndpointSliceCache("", nil, nil, nil), + initialSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port443, port80}, + }, + updatedSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port80, port443}, + }, + expectChanged: false, + }, + "port removed": { + cache: NewEndpointSliceCache("", nil, nil, nil), + initialSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port443, port80}, + }, + updatedSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port443}, + }, + expectChanged: true, + }, + "port added": { + cache: NewEndpointSliceCache("", nil, nil, nil), + initialSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port443}, + }, + updatedSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port443, port80}, + }, + expectChanged: true, + }, + "identical with endpoints": { + cache: NewEndpointSliceCache("", nil, nil, nil), + initialSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port443}, + Endpoints: []discovery.Endpoint{endpoint1, endpoint2}, + }, + updatedSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port443}, + Endpoints: []discovery.Endpoint{endpoint1, endpoint2}, + }, + expectChanged: false, + }, + "identical with endpoints out of order": { + cache: NewEndpointSliceCache("", nil, nil, nil), + initialSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port443}, + Endpoints: []discovery.Endpoint{endpoint1, endpoint2}, + }, + updatedSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port443}, + Endpoints: []discovery.Endpoint{endpoint2, endpoint1}, + }, + expectChanged: false, + }, + "identical with endpoint added": { + cache: NewEndpointSliceCache("", nil, nil, nil), + initialSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port443}, + Endpoints: []discovery.Endpoint{endpoint1}, + }, + updatedSlice: &discovery.EndpointSlice{ + ObjectMeta: objMeta, + Ports: []discovery.EndpointPort{port443}, + Endpoints: []discovery.Endpoint{endpoint2, endpoint1}, + }, + expectChanged: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + if tc.initialSlice != nil { + tc.cache.updatePending(tc.initialSlice, false) + tc.cache.checkoutChanges() + } + + serviceKey, sliceKey, err := endpointSliceCacheKeys(tc.updatedSlice) + if err != nil { + t.Fatalf("Expected no error calling endpointSliceCacheKeys(): %v", err) + } + + esInfo := newEndpointSliceInfo(tc.updatedSlice, false) + changed := tc.cache.esInfoChanged(serviceKey, sliceKey, esInfo) + + if tc.expectChanged != changed { + t.Errorf("Expected esInfoChanged() to return %t, got %t", tc.expectChanged, changed) + } + }) } }