mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #108812 from danwinship/endpoint-chain-names
proxy/iptables: fix up endpoint chain name computation
This commit is contained in:
commit
475f7af1c1
@ -150,7 +150,7 @@ func newBaseEndpointInfo(IP, nodeName, zone string, port int, isLocal bool,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type makeEndpointFunc func(info *BaseEndpointInfo) Endpoint
|
type makeEndpointFunc func(info *BaseEndpointInfo, svcPortName *ServicePortName) Endpoint
|
||||||
|
|
||||||
// This handler is invoked by the apply function on every change. This function should not modify the
|
// This handler is invoked by the apply function on every change. This function should not modify the
|
||||||
// EndpointsMap's but just use the changes for any Proxier specific cleanup.
|
// EndpointsMap's but just use the changes for any Proxier specific cleanup.
|
||||||
@ -462,7 +462,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
|
|||||||
// Zone information is only supported with EndpointSlice API
|
// Zone information is only supported with EndpointSlice API
|
||||||
baseEndpointInfo := newBaseEndpointInfo(addr.IP, nodeName, "", int(port.Port), isLocal, isReady, isServing, isTerminating, zoneHints)
|
baseEndpointInfo := newBaseEndpointInfo(addr.IP, nodeName, "", int(port.Port), isLocal, isReady, isServing, isTerminating, zoneHints)
|
||||||
if ect.makeEndpointInfo != nil {
|
if ect.makeEndpointInfo != nil {
|
||||||
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo))
|
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], ect.makeEndpointInfo(baseEndpointInfo, &svcPortName))
|
||||||
} else {
|
} else {
|
||||||
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], baseEndpointInfo)
|
endpointsMap[svcPortName] = append(endpointsMap[svcPortName], baseEndpointInfo)
|
||||||
}
|
}
|
||||||
|
@ -158,7 +158,7 @@ func newEndpointSliceInfo(endpointSlice *discovery.EndpointSlice, remove bool) *
|
|||||||
}
|
}
|
||||||
|
|
||||||
// standardEndpointInfo is the default makeEndpointFunc.
|
// standardEndpointInfo is the default makeEndpointFunc.
|
||||||
func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint {
|
func standardEndpointInfo(ep *BaseEndpointInfo, _ *ServicePortName) Endpoint {
|
||||||
return ep
|
return ep
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -250,7 +250,7 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names
|
|||||||
Protocol: *port.Protocol,
|
Protocol: *port.Protocol,
|
||||||
}
|
}
|
||||||
|
|
||||||
endpointInfoBySP[svcPortName] = cache.addEndpoints(serviceNN, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints)
|
endpointInfoBySP[svcPortName] = cache.addEndpoints(&svcPortName, int(*port.Port), endpointInfoBySP[svcPortName], sliceInfo.Endpoints)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -258,7 +258,7 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names
|
|||||||
}
|
}
|
||||||
|
|
||||||
// addEndpoints adds endpointInfo for each unique endpoint.
|
// addEndpoints adds endpointInfo for each unique endpoint.
|
||||||
func (cache *EndpointSliceCache) addEndpoints(serviceNN types.NamespacedName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint {
|
func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, portNum int, endpointSet map[string]Endpoint, endpoints []*endpointInfo) map[string]Endpoint {
|
||||||
if endpointSet == nil {
|
if endpointSet == nil {
|
||||||
endpointSet = map[string]Endpoint{}
|
endpointSet = map[string]Endpoint{}
|
||||||
}
|
}
|
||||||
@ -275,7 +275,7 @@ func (cache *EndpointSliceCache) addEndpoints(serviceNN types.NamespacedName, po
|
|||||||
if (cache.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(endpoint.Addresses[0]) {
|
if (cache.ipFamily == v1.IPv6Protocol) != utilnet.IsIPv6String(endpoint.Addresses[0]) {
|
||||||
// Emit event on the corresponding service which had a different IP
|
// Emit event on the corresponding service which had a different IP
|
||||||
// version than the endpoint.
|
// version than the endpoint.
|
||||||
utilproxy.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], serviceNN.Namespace, serviceNN.Name, "")
|
utilproxy.LogAndEmitIncorrectIPVersionEvent(cache.recorder, "endpointslice", endpoint.Addresses[0], svcPortName.NamespacedName.Namespace, svcPortName.NamespacedName.Name, "")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -298,7 +298,7 @@ func (cache *EndpointSliceCache) addEndpoints(serviceNN types.NamespacedName, po
|
|||||||
// isLocal should not vary between matching endpoints, but if it does, we
|
// isLocal should not vary between matching endpoints, but if it does, we
|
||||||
// favor a true value here if it exists.
|
// favor a true value here if it exists.
|
||||||
if _, exists := endpointSet[endpointInfo.String()]; !exists || isLocal {
|
if _, exists := endpointSet[endpointInfo.String()]; !exists || isLocal {
|
||||||
endpointSet[endpointInfo.String()] = cache.makeEndpointInfo(endpointInfo)
|
endpointSet[endpointInfo.String()] = cache.makeEndpointInfo(endpointInfo, svcPortName)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,16 +142,16 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.B
|
|||||||
// internal struct for endpoints information
|
// internal struct for endpoints information
|
||||||
type endpointsInfo struct {
|
type endpointsInfo struct {
|
||||||
*proxy.BaseEndpointInfo
|
*proxy.BaseEndpointInfo
|
||||||
// The following fields we lazily compute and store here for performance
|
|
||||||
// reasons. If the protocol is the same as you expect it to be, then the
|
ChainName utiliptables.Chain
|
||||||
// chainName can be reused, otherwise it should be recomputed.
|
|
||||||
protocol string
|
|
||||||
chainName utiliptables.Chain
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns a new proxy.Endpoint which abstracts a endpointsInfo
|
// returns a new proxy.Endpoint which abstracts a endpointsInfo
|
||||||
func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo) proxy.Endpoint {
|
func newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, svcPortName *proxy.ServicePortName) proxy.Endpoint {
|
||||||
return &endpointsInfo{BaseEndpointInfo: baseInfo}
|
return &endpointsInfo{
|
||||||
|
BaseEndpointInfo: baseInfo,
|
||||||
|
ChainName: servicePortEndpointChainName(svcPortName.String(), strings.ToLower(string(svcPortName.Protocol)), baseInfo.Endpoint),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Equal overrides the Equal() function implemented by proxy.BaseEndpointInfo.
|
// Equal overrides the Equal() function implemented by proxy.BaseEndpointInfo.
|
||||||
@ -163,20 +163,10 @@ func (e *endpointsInfo) Equal(other proxy.Endpoint) bool {
|
|||||||
}
|
}
|
||||||
return e.Endpoint == o.Endpoint &&
|
return e.Endpoint == o.Endpoint &&
|
||||||
e.IsLocal == o.IsLocal &&
|
e.IsLocal == o.IsLocal &&
|
||||||
e.protocol == o.protocol &&
|
e.ChainName == o.ChainName &&
|
||||||
e.chainName == o.chainName &&
|
|
||||||
e.Ready == o.Ready
|
e.Ready == o.Ready
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the endpoint chain name for a given endpointsInfo.
|
|
||||||
func (e *endpointsInfo) endpointChain(svcNameString, protocol string) utiliptables.Chain {
|
|
||||||
if e.protocol != protocol {
|
|
||||||
e.protocol = protocol
|
|
||||||
e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.Endpoint)
|
|
||||||
}
|
|
||||||
return e.chainName
|
|
||||||
}
|
|
||||||
|
|
||||||
// Proxier is an iptables based proxy for connections between a localhost:lport
|
// Proxier is an iptables based proxy for connections between a localhost:lport
|
||||||
// and services that provide the actual backends.
|
// and services that provide the actual backends.
|
||||||
type Proxier struct {
|
type Proxier struct {
|
||||||
@ -1026,7 +1016,7 @@ func (proxier *Proxier) syncProxyRules() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
endpointChain := epInfo.endpointChain(svcNameString, protocol)
|
endpointChain := epInfo.ChainName
|
||||||
endpointInUse := false
|
endpointInUse := false
|
||||||
|
|
||||||
if epInfo.Ready {
|
if epInfo.Ready {
|
||||||
|
@ -2934,7 +2934,7 @@ func makeServiceMap(proxier *Proxier, allServices ...*v1.Service) {
|
|||||||
proxier.servicesSynced = true
|
proxier.servicesSynced = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
|
func compareEndpointsMapsExceptChainName(t *testing.T, tci int, newMap proxy.EndpointsMap, expected map[proxy.ServicePortName][]*endpointsInfo) {
|
||||||
if len(newMap) != len(expected) {
|
if len(newMap) != len(expected) {
|
||||||
t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
|
t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)
|
||||||
}
|
}
|
||||||
@ -2949,9 +2949,7 @@ func compareEndpointsMaps(t *testing.T, tci int, newMap proxy.EndpointsMap, expe
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if newEp.Endpoint != expected[x][i].Endpoint ||
|
if newEp.Endpoint != expected[x][i].Endpoint ||
|
||||||
newEp.IsLocal != expected[x][i].IsLocal ||
|
newEp.IsLocal != expected[x][i].IsLocal {
|
||||||
newEp.protocol != expected[x][i].protocol ||
|
|
||||||
newEp.chainName != expected[x][i].chainName {
|
|
||||||
t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
|
t.Errorf("[%d] expected new[%v][%d] to be %v, got %v", tci, x, i, expected[x][i], newEp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3717,7 +3715,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
fp.endpointsMap.Update(fp.endpointsChanges)
|
fp.endpointsMap.Update(fp.endpointsChanges)
|
||||||
compareEndpointsMaps(t, tci, fp.endpointsMap, tc.oldEndpoints)
|
compareEndpointsMapsExceptChainName(t, tci, fp.endpointsMap, tc.oldEndpoints)
|
||||||
|
|
||||||
// Now let's call appropriate handlers to get to state we want to be.
|
// Now let's call appropriate handlers to get to state we want to be.
|
||||||
if len(tc.previousEndpoints) != len(tc.currentEndpoints) {
|
if len(tc.previousEndpoints) != len(tc.currentEndpoints) {
|
||||||
@ -3738,7 +3736,7 @@ func Test_updateEndpointsMap(t *testing.T) {
|
|||||||
}
|
}
|
||||||
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
result := fp.endpointsMap.Update(fp.endpointsChanges)
|
||||||
newMap := fp.endpointsMap
|
newMap := fp.endpointsMap
|
||||||
compareEndpointsMaps(t, tci, newMap, tc.expectedResult)
|
compareEndpointsMapsExceptChainName(t, tci, newMap, tc.expectedResult)
|
||||||
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
|
if len(result.StaleEndpoints) != len(tc.expectedStaleEndpoints) {
|
||||||
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints)
|
t.Errorf("[%d] expected %d staleEndpoints, got %d: %v", tci, len(tc.expectedStaleEndpoints), len(result.StaleEndpoints), result.StaleEndpoints)
|
||||||
}
|
}
|
||||||
|
@ -378,7 +378,7 @@ func (proxier *Proxier) onServiceMapChange(svcPortName *proxy.ServicePortName) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// returns a new proxy.Endpoint which abstracts a endpointsInfo
|
// returns a new proxy.Endpoint which abstracts a endpointsInfo
|
||||||
func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo) proxy.Endpoint {
|
func (proxier *Proxier) newEndpointInfo(baseInfo *proxy.BaseEndpointInfo, _ *proxy.ServicePortName) proxy.Endpoint {
|
||||||
|
|
||||||
portNumber, err := baseInfo.Port()
|
portNumber, err := baseInfo.Port()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user