mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
proxy/iptables: fix up endpoint chain name computation
Rather than lazily computing and then caching the endpoint chain name because we don't have the right information at construct time, just pass the right information at construct time and compute the chain name then.
This commit is contained in:
parent
3a4064c5c8
commit
dd4d88398c
@ -150,7 +150,7 @@ func newBaseEndpointInfo(IP, nodeName, zone string, port int, isLocal bool,
|
||||
}
|
||||
}
|
||||
|
||||
type makeEndpointFunc func(info *BaseEndpointInfo) Endpoint
|
||||
type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint
|
||||
|
||||
// This handler is invoked by the apply function on every change. This function should not modify the
|
||||
// EndpointsMap's but just use the changes for any Proxier specific cleanup.
|
||||
@ -462,7 +462,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
|
||||
// Zone information is only supported with EndpointSlice API
|
||||
baseEndpointInfo := newBaseEndpointInfo(addr.IP, nodeName, "", int(port.Port), isLocal, isReady, isServing, isTerminating, zoneHints)
|
||||
if ect.makeEndpointInfo != nil {
|
||||
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo))
|
||||
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo, &svcPortName))
|
||||
} else {
|
||||
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], baseEndpointInfo)
|
||||
}
|
||||
|
@ -158,7 +158,7 @@ func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *
|
||||
}
|
||||
|
||||
// standardEndpointInfo is the default makeEndpointFunc.
|
||||
func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint {
|
||||
func standardEndpointInfo(ep *BaseEndpointInfo, _ *ServicePortName) Endpoint {
|
||||
return ep
|
||||
}
|
||||
|
||||
@ -250,7 +250,7 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names
|
||||
Protocol: *port.Protocol,
|
||||
}
|
||||
|
||||
endpointInfoBySP[svcPortName] = cache.addEndpoints(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints)
|
||||
endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints)
|
||||
}
|
||||
}
|
||||
|
||||
@ -258,7 +258,7 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names
|
||||
}
|
||||
|
||||
// addEndpoints adds endpointInfo for each unique endpoint.
|
||||
func (cache *EndpointSliceCache) addEndpoints(serviceNN types.NamespacedName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint {
|
||||
func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint {
|
||||
if endpointSet == nil {
|
||||
endpointSet = map[string]Endpoint{}
|
||||
}
|
||||
@ -275,7 +275,7 @@ func (cache *EndpointSliceCache) addEndpoints(serviceNN types.NamespacedName, po
|
||||
if (cache.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(endpoint.Addresses[0]) {
|
||||
// Emit event on the corresponding service which had a different IP
|
||||
// version than the endpoint.
|
||||
utilproxy.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], serviceNN.Namespace, serviceNN.Name, "")
|
||||
utilproxy.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], svcPortName.NamespacedName.Namespace, svcPortName.NamespacedName.Name, "")
|
||||
continue
|
||||
}
|
||||
|
||||
@ -298,7 +298,7 @@ func (cache *EndpointSliceCache) addEndpoints(serviceNN types.NamespacedName, po
|
||||
// isLocal should not vary between matching endpoints, but if it does, we
|
||||
// favor a true value here if it exists.
|
||||
if _, exists := endpointSet[endpointInfo.String()]; !exists || isLocal {
|
||||
endpointSet[endpointInfo.String()] = cache.makeEndpointInfo(endpointInfo)
|
||||
endpointSet[endpointInfo.String()] = cache.makeEndpointInfo(endpointInfo, svcPortName)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -142,16 +142,16 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.B
|
||||
// internal struct for endpoints information
|
||||
type endpointsInfo struct {
|
||||
*proxy.BaseEndpointInfo
|
||||
// The following fields we lazily compute and store here for performance
|
||||
// reasons. If the protocol is the same as you expect it to be, then the
|
||||
// chainName can be reused, otherwise it should be recomputed.
|
||||
protocol string
|
||||
chainName utiliptables.Chain
|
||||
|
||||
ChainName utiliptables.Chain
|
||||
}
|
||||
|
||||
// returns a new proxy.Endpoint which abstracts a endpointsInfo
|
||||
func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo) proxy.Endpoint {
|
||||
return &endpointsInfo{BaseEndpointInfo: baseInfo}
|
||||
func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
|
||||
return &endpointsInfo{
|
||||
BaseEndpointInfo: baseInfo,
|
||||
ChainName: servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.Endpoint),
|
||||
}
|
||||
}
|
||||
|
||||
// Equal overrides the Equal() function implemented by proxy.BaseEndpointInfo.
|
||||
@ -163,20 +163,10 @@ func (e *endpointsInfo) Equal(other proxy.Endpoint) bool {
|
||||
}
|
||||
return e.Endpoint == o.Endpoint &&
|
||||
e.IsLocal == o.IsLocal &&
|
||||
e.protocol == o.protocol &&
|
||||
e.chainName == o.chainName &&
|
||||
e.ChainName == o.ChainName &&
|
||||
e.Ready == o.Ready
|
||||
}
|
||||
|
||||
// Returns the endpoint chain name for a given endpointsInfo.
|
||||
func (e *endpointsInfo) endpointChain(svcNameString, protocol string) utiliptables.Chain {
|
||||
if e.protocol != protocol {
|
||||
e.protocol = protocol
|
||||
e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.Endpoint)
|
||||
}
|
||||
return e.chainName
|
||||
}
|
||||
|
||||
// Proxier is an iptables based proxy for connections between a localhost:lport
|
||||
// and services that provide the actual backends.
|
||||
type Proxier struct {
|
||||
@ -1026,7 +1016,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
continue
|
||||
}
|
||||
|
||||
endpointChain := epInfo.endpointChain(svcNameString, protocol)
|
||||
endpointChain := epInfo.ChainName
|
||||
endpointInUse := false
|
||||
|
||||
if epInfo.Ready {
|
||||
|
@ -2934,7 +2934,7 @@ func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
|
||||
proxier.servicesSynced = true
|
||||
}
|
||||
|
||||
func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
|
||||
func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
|
||||
if len(newMap) != len(expected) {
|
||||
t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
|
||||
}
|
||||
@ -2949,9 +2949,7 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expe
|
||||
continue
|
||||
}
|
||||
if newEp.Endpoint != expected[x][i].Endpoint ||
|
||||
newEp.IsLocal != expected[x][i].IsLocal ||
|
||||
newEp.protocol != expected[x][i].protocol ||
|
||||
newEp.chainName != expected[x][i].chainName {
|
||||
newEp.IsLocal != expected[x][i].IsLocal {
|
||||
t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
|
||||
}
|
||||
}
|
||||
@ -3717,7 +3715,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
}
|
||||
}
|
||||
fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
|
||||
compareEndpointsMapsExceptChainName(t, tci, fp.endpointsMap, tc.oldEndpoints)
|
||||
|
||||
// Now let's call appropriate handlers to get to state we want to be.
|
||||
if len(tc.previousEndpoints) != len(tc.currentEndpoints) {
|
||||
@ -3738,7 +3736,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
||||
}
|
||||
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
||||
newMap := fp.endpointsMap
|
||||
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
||||
compareEndpointsMapsExceptChainName(t, tci, newMap, tc.expectedResult)
|
||||
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
|
||||
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints)
|
||||
}
|
||||
|
@ -377,7 +377,7 @@ func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) {
|
||||
}
|
||||
|
||||
// returns a new proxy.Endpoint which abstracts a endpointsInfo
|
||||
func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo) proxy.Endpoint {
|
||||
func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *proxy.ServicePortName) proxy.Endpoint {
|
||||
|
||||
portNumber, err := baseInfo.Port()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user