From f9cb68a2b13f5322005f4f938da3e9ee35f04372 Mon Sep 17 00:00:00 2001 From: arkbriar Date: Mon, 24 May 2021 12:40:41 +0800 Subject: [PATCH] Fix EndpointSliceCache::getEndpointsMap for different endpoints with same IP --- pkg/proxy/endpointslicecache.go | 44 +++++++++--------- pkg/proxy/endpointslicecache_test.go | 67 ++++++++++++++++++++++++++-- 2 files changed, 86 insertions(+), 25 deletions(-) diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index ec3bb8b3292..58f8ef3fbc0 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -89,7 +89,7 @@ type endpointInfo struct { } // spToEndpointMap stores groups Endpoint objects by ServicePortName and -// IP address. +// endpoint string (returned by Endpoint.String()). type spToEndpointMap map[ServicePortName]map[string]Endpoint // NewEndpointSliceCache initializes an EndpointSliceCache. @@ -251,20 +251,20 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names Protocol: *port.Protocol, } - endpointInfoBySP[svcPortName] = cache.addEndpointsByIP(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints) + endpointInfoBySP[svcPortName] = cache.addEndpoints(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints) } } return endpointInfoBySP } -// addEndpointsByIP adds endpointInfo for each IP. -func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName, portNum int, endpointsByIP map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint { - if endpointsByIP == nil { - endpointsByIP = map[string]Endpoint{} +// addEndpoints adds endpointInfo for each unique endpoint. +func (cache *EndpointSliceCache) addEndpoints(serviceNN types.NamespacedName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint { + if endpointSet == nil { + endpointSet = map[string]Endpoint{} } - // iterate through endpoints to add them to endpointsByIP. + // iterate through endpoints to add them to endpointSet. for _, endpoint := range endpoints { if len(endpoint.Addresses) == 0 { klog.Warningf("ignoring invalid endpoint port %s with empty addresses", endpoint) @@ -290,15 +290,15 @@ func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpoint.Topology, endpoint.Ready, endpoint.Serving, endpoint.Terminating, endpoint.ZoneHints) - // This logic ensures we're deduping potential overlapping endpoints - // isLocal should not vary between matching IPs, but if it does, we + // This logic ensures we're deduplicating potential overlapping endpoints + // isLocal should not vary between matching endpoints, but if it does, we // favor a true value here if it exists. - if _, exists := endpointsByIP[endpointInfo.IP()]; !exists || isLocal { - endpointsByIP[endpointInfo.IP()] = cache.makeEndpointInfo(endpointInfo) + if _, exists := endpointSet[endpointInfo.String()]; !exists || isLocal { + endpointSet[endpointInfo.String()] = cache.makeEndpointInfo(endpointInfo) } } - return endpointsByIP + return endpointSet } func (cache *EndpointSliceCache) isLocal(hostname string) bool { @@ -341,15 +341,15 @@ func endpointsMapFromEndpointInfo(endpointInfoBySP map[ServicePortName]map[strin endpointsMap := EndpointsMap{} // transform endpointInfoByServicePort into an endpointsMap with sorted IPs. - for svcPortName, endpointInfoByIP := range endpointInfoBySP { - if len(endpointInfoByIP) > 0 { + for svcPortName, endpointSet := range endpointInfoBySP { + if len(endpointSet) > 0 { endpointsMap[svcPortName] = []Endpoint{} - for _, endpointInfo := range endpointInfoByIP { + for _, endpointInfo := range endpointSet { endpointsMap[svcPortName] = append(endpointsMap[svcPortName], endpointInfo) } - // Ensure IPs are always returned in the same order to simplify diffing. - sort.Sort(byIP(endpointsMap[svcPortName])) + // Ensure endpoints are always returned in the same order to simplify diffing. + sort.Sort(byEndpoint(endpointsMap[svcPortName])) klog.V(3).Infof("Setting endpoints for %q to %+v", svcPortName, formatEndpointsList(endpointsMap[svcPortName])) } @@ -392,16 +392,16 @@ func (e byAddress) Less(i, j int) bool { return strings.Join(e[i].Addresses, ",") < strings.Join(e[j].Addresses, ",") } -// byIP helps sort endpoints by IP -type byIP []Endpoint +// byEndpoint helps sort endpoints by endpoint string. +type byEndpoint []Endpoint -func (e byIP) Len() int { +func (e byEndpoint) Len() int { return len(e) } -func (e byIP) Swap(i, j int) { +func (e byEndpoint) Swap(i, j int) { e[i], e[j] = e[j], e[i] } -func (e byIP) Less(i, j int) bool { +func (e byEndpoint) Less(i, j int) bool { return e[i].String() < e[j].String() } diff --git a/pkg/proxy/endpointslicecache_test.go b/pkg/proxy/endpointslicecache_test.go index 256c9878e5a..360c23a6e6f 100644 --- a/pkg/proxy/endpointslicecache_test.go +++ b/pkg/proxy/endpointslicecache_test.go @@ -183,6 +183,23 @@ func TestEndpointsMapFromESC(t *testing.T) { }, expectedMap: map[ServicePortName][]*BaseEndpointInfo{}, }, + // Make sure that different endpoints with duplicate IPs are returned correctly. + "Different endpoints with duplicate IPs should not be filtered": { + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + hostname: "host1", + endpointSlices: []*discovery.EndpointSlice{ + generateEndpointSliceWithOffset("svc1", "ns1", 1, 1, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80)}), + generateEndpointSliceWithOffset("svc1", "ns1", 2, 1, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(8080)}), + }, + expectedMap: map[ServicePortName][]*BaseEndpointInfo{ + makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { + &BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + &BaseEndpointInfo{Endpoint: "10.0.1.1:8080", IsLocal: false, Ready: true, Serving: true, Terminating: false}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + &BaseEndpointInfo{Endpoint: "10.0.1.2:8080", IsLocal: true, Ready: true, Serving: true, Terminating: false}, + }, + }, + }, } for name, tc := range testCases { @@ -215,7 +232,7 @@ func TestEndpointInfoByServicePort(t *testing.T) { }, expectedMap: spToEndpointMap{ makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { - "10.0.1.1": &BaseEndpointInfo{ + "10.0.1.1:80": &BaseEndpointInfo{ Endpoint: "10.0.1.1:80", IsLocal: false, Topology: map[string]string{"kubernetes.io/hostname": "host2"}, @@ -223,7 +240,7 @@ func TestEndpointInfoByServicePort(t *testing.T) { Serving: true, Terminating: false, }, - "10.0.1.2": &BaseEndpointInfo{ + "10.0.1.2:80": &BaseEndpointInfo{ Endpoint: "10.0.1.2:80", IsLocal: true, Topology: map[string]string{"kubernetes.io/hostname": "host1"}, @@ -231,7 +248,7 @@ func TestEndpointInfoByServicePort(t *testing.T) { Serving: true, Terminating: false, }, - "10.0.1.3": &BaseEndpointInfo{ + "10.0.1.3:80": &BaseEndpointInfo{ Endpoint: "10.0.1.3:80", IsLocal: false, Topology: map[string]string{"kubernetes.io/hostname": "host2"}, @@ -242,6 +259,50 @@ func TestEndpointInfoByServicePort(t *testing.T) { }, }, }, + "4 different slices with duplicate IPs": { + namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"}, + hostname: "host1", + endpointSlices: []*discovery.EndpointSlice{ + generateEndpointSliceWithOffset("svc1", "ns1", 1, 1, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80)}), + generateEndpointSliceWithOffset("svc1", "ns1", 2, 1, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(8080)}), + }, + expectedMap: spToEndpointMap{ + makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): { + "10.0.1.1:80": &BaseEndpointInfo{ + Endpoint: "10.0.1.1:80", + IsLocal: false, + Topology: map[string]string{"kubernetes.io/hostname": "host2"}, + Ready: true, + Serving: true, + Terminating: false, + }, + "10.0.1.2:80": &BaseEndpointInfo{ + Endpoint: "10.0.1.2:80", + IsLocal: true, + Topology: map[string]string{"kubernetes.io/hostname": "host1"}, + Ready: true, + Serving: true, + Terminating: false, + }, + "10.0.1.1:8080": &BaseEndpointInfo{ + Endpoint: "10.0.1.1:8080", + IsLocal: false, + Topology: map[string]string{"kubernetes.io/hostname": "host2"}, + Ready: true, + Serving: true, + Terminating: false, + }, + "10.0.1.2:8080": &BaseEndpointInfo{ + Endpoint: "10.0.1.2:8080", + IsLocal: true, + Topology: map[string]string{"kubernetes.io/hostname": "host1"}, + Ready: true, + Serving: true, + Terminating: false, + }, + }, + }, + }, } for name, tc := range testCases {