Fix EndpointSliceCache::getEndpointsMap for different endpoints with same IP

This commit is contained in:
arkbriar 2021-05-24 12:40:41 +08:00 committed by cangzhen.dsj
parent 2112bddae1
commit f9cb68a2b1
2 changed files with 86 additions and 25 deletions

View File

@ -89,7 +89,7 @@ type endpointInfo struct {
}
// spToEndpointMap stores groups Endpoint objects by ServicePortName and
// IP address.
// endpoint string (returned by Endpoint.String()).
type spToEndpointMap map[ServicePortName]map[string]Endpoint
// NewEndpointSliceCache initializes an EndpointSliceCache.
@ -251,20 +251,20 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names
Protocol: *port.Protocol,
}
endpointInfoBySP[svcPortName] = cache.addEndpointsByIP(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints)
endpointInfoBySP[svcPortName] = cache.addEndpoints(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints)
}
}
return endpointInfoBySP
}
// addEndpointsByIP adds endpointInfo for each IP.
func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName, portNum int, endpointsByIP map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint {
if endpointsByIP == nil {
endpointsByIP = map[string]Endpoint{}
// addEndpoints adds endpointInfo for each unique endpoint.
func (cache *EndpointSliceCache) addEndpoints(serviceNN types.NamespacedName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint {
if endpointSet == nil {
endpointSet = map[string]Endpoint{}
}
// iterate through endpoints to add them to endpointsByIP.
// iterate through endpoints to add them to endpointSet.
for _, endpoint := range endpoints {
if len(endpoint.Addresses) == 0 {
klog.Warningf("ignoring invalid endpoint port %s with empty addresses", endpoint)
@ -290,15 +290,15 @@ func (cache *EndpointSliceCache) addEndpointsByIP(serviceNN types.NamespacedName
endpointInfo := newBaseEndpointInfo(endpoint.Addresses[0], portNum, isLocal, endpoint.Topology,
endpoint.Ready, endpoint.Serving, endpoint.Terminating, endpoint.ZoneHints)
// This logic ensures we're deduping potential overlapping endpoints
// isLocal should not vary between matching IPs, but if it does, we
// This logic ensures we're deduplicating potential overlapping endpoints
// isLocal should not vary between matching endpoints, but if it does, we
// favor a true value here if it exists.
if _, exists := endpointsByIP[endpointInfo.IP()]; !exists || isLocal {
endpointsByIP[endpointInfo.IP()] = cache.makeEndpointInfo(endpointInfo)
if _, exists := endpointSet[endpointInfo.String()]; !exists || isLocal {
endpointSet[endpointInfo.String()] = cache.makeEndpointInfo(endpointInfo)
}
}
return endpointsByIP
return endpointSet
}
func (cache *EndpointSliceCache) isLocal(hostname string) bool {
@ -341,15 +341,15 @@ func endpointsMapFromEndpointInfo(endpointInfoBySP map[ServicePortName]map[strin
endpointsMap := EndpointsMap{}
// transform endpointInfoByServicePort into an endpointsMap with sorted IPs.
for svcPortName, endpointInfoByIP := range endpointInfoBySP {
if len(endpointInfoByIP) > 0 {
for svcPortName, endpointSet := range endpointInfoBySP {
if len(endpointSet) > 0 {
endpointsMap[svcPortName] = []Endpoint{}
for _, endpointInfo := range endpointInfoByIP {
for _, endpointInfo := range endpointSet {
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], endpointInfo)
}
// Ensure IPs are always returned in the same order to simplify diffing.
sort.Sort(byIP(endpointsMap[svcPortName]))
// Ensure endpoints are always returned in the same order to simplify diffing.
sort.Sort(byEndpoint(endpointsMap[svcPortName]))
klog.V(3).Infof("Setting endpoints for %q to %+v", svcPortName, formatEndpointsList(endpointsMap[svcPortName]))
}
@ -392,16 +392,16 @@ func (e byAddress) Less(i, j int) bool {
return strings.Join(e[i].Addresses, ",") < strings.Join(e[j].Addresses, ",")
}
// byIP helps sort endpoints by IP
type byIP []Endpoint
// byEndpoint helps sort endpoints by endpoint string.
type byEndpoint []Endpoint
func (e byIP) Len() int {
func (e byEndpoint) Len() int {
return len(e)
}
func (e byIP) Swap(i, j int) {
func (e byEndpoint) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}
func (e byIP) Less(i, j int) bool {
func (e byEndpoint) Less(i, j int) bool {
return e[i].String() < e[j].String()
}

View File

@ -183,6 +183,23 @@ func TestEndpointsMapFromESC(t *testing.T) {
},
expectedMap: map[ServicePortName][]*BaseEndpointInfo{},
},
// Make sure that different endpoints with duplicate IPs are returned correctly.
"Different endpoints with duplicate IPs should not be filtered": {
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
hostname: "host1",
endpointSlices: []*discovery.EndpointSlice{
generateEndpointSliceWithOffset("svc1", "ns1", 1, 1, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80)}),
generateEndpointSliceWithOffset("svc1", "ns1", 2, 1, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(8080)}),
},
expectedMap: map[ServicePortName][]*BaseEndpointInfo{
makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
&BaseEndpointInfo{Endpoint: "10.0.1.1:80", IsLocal: false, Ready: true, Serving: true, Terminating: false},
&BaseEndpointInfo{Endpoint: "10.0.1.1:8080", IsLocal: false, Ready: true, Serving: true, Terminating: false},
&BaseEndpointInfo{Endpoint: "10.0.1.2:80", IsLocal: true, Ready: true, Serving: true, Terminating: false},
&BaseEndpointInfo{Endpoint: "10.0.1.2:8080", IsLocal: true, Ready: true, Serving: true, Terminating: false},
},
},
},
}
for name, tc := range testCases {
@ -215,7 +232,7 @@ func TestEndpointInfoByServicePort(t *testing.T) {
},
expectedMap: spToEndpointMap{
makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
"10.0.1.1": &BaseEndpointInfo{
"10.0.1.1:80": &BaseEndpointInfo{
Endpoint: "10.0.1.1:80",
IsLocal: false,
Topology: map[string]string{"kubernetes.io/hostname": "host2"},
@ -223,7 +240,7 @@ func TestEndpointInfoByServicePort(t *testing.T) {
Serving: true,
Terminating: false,
},
"10.0.1.2": &BaseEndpointInfo{
"10.0.1.2:80": &BaseEndpointInfo{
Endpoint: "10.0.1.2:80",
IsLocal: true,
Topology: map[string]string{"kubernetes.io/hostname": "host1"},
@ -231,7 +248,7 @@ func TestEndpointInfoByServicePort(t *testing.T) {
Serving: true,
Terminating: false,
},
"10.0.1.3": &BaseEndpointInfo{
"10.0.1.3:80": &BaseEndpointInfo{
Endpoint: "10.0.1.3:80",
IsLocal: false,
Topology: map[string]string{"kubernetes.io/hostname": "host2"},
@ -242,6 +259,50 @@ func TestEndpointInfoByServicePort(t *testing.T) {
},
},
},
"4 different slices with duplicate IPs": {
namespacedName: types.NamespacedName{Name: "svc1", Namespace: "ns1"},
hostname: "host1",
endpointSlices: []*discovery.EndpointSlice{
generateEndpointSliceWithOffset("svc1", "ns1", 1, 1, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(80)}),
generateEndpointSliceWithOffset("svc1", "ns1", 2, 1, 2, 999, 999, []string{"host1", "host2"}, []*int32{utilpointer.Int32Ptr(8080)}),
},
expectedMap: spToEndpointMap{
makeServicePortName("ns1", "svc1", "port-0", v1.ProtocolTCP): {
"10.0.1.1:80": &BaseEndpointInfo{
Endpoint: "10.0.1.1:80",
IsLocal: false,
Topology: map[string]string{"kubernetes.io/hostname": "host2"},
Ready: true,
Serving: true,
Terminating: false,
},
"10.0.1.2:80": &BaseEndpointInfo{
Endpoint: "10.0.1.2:80",
IsLocal: true,
Topology: map[string]string{"kubernetes.io/hostname": "host1"},
Ready: true,
Serving: true,
Terminating: false,
},
"10.0.1.1:8080": &BaseEndpointInfo{
Endpoint: "10.0.1.1:8080",
IsLocal: false,
Topology: map[string]string{"kubernetes.io/hostname": "host2"},
Ready: true,
Serving: true,
Terminating: false,
},
"10.0.1.2:8080": &BaseEndpointInfo{
Endpoint: "10.0.1.2:8080",
IsLocal: true,
Topology: map[string]string{"kubernetes.io/hostname": "host1"},
Ready: true,
Serving: true,
Terminating: false,
},
},
},
},
}
for name, tc := range testCases {