proxier/iptables: fallback to terminating endpoints if there are no ready endpoints

Signed-off-by: Andrew Sy Kim <kim.andrewsy@gmail.com>
This commit is contained in:
Andrew Sy Kim 2021-03-10 11:07:44 -05:00
parent be92fc83e2
commit 732635fd4b

View File

@ -1033,16 +1033,7 @@ func (proxier *Proxier) syncProxyRules() {
// Service does not have conflicting configuration such as // Service does not have conflicting configuration such as
// externalTrafficPolicy=Local. // externalTrafficPolicy=Local.
allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
hasEndpoints := len(allEndpoints) > 0
readyEndpoints := make([]proxy.Endpoint, 0, len(allEndpoints))
for _, endpoint := range allEndpoints {
if !endpoint.IsReady() {
continue
}
readyEndpoints = append(readyEndpoints, endpoint)
}
hasEndpoints := len(readyEndpoints) > 0
svcChain := svcInfo.servicePortChainName svcChain := svcInfo.servicePortChainName
if hasEndpoints { if hasEndpoints {
@ -1365,7 +1356,7 @@ func (proxier *Proxier) syncProxyRules() {
endpoints = endpoints[:0] endpoints = endpoints[:0]
endpointChains = endpointChains[:0] endpointChains = endpointChains[:0]
var endpointChain utiliptables.Chain var endpointChain utiliptables.Chain
for _, ep := range readyEndpoints { for _, ep := range allEndpoints {
epInfo, ok := ep.(*endpointsInfo) epInfo, ok := ep.(*endpointsInfo)
if !ok { if !ok {
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep.String()) klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep.String())
@ -1401,16 +1392,33 @@ func (proxier *Proxier) syncProxyRules() {
} }
} }
// Now write loadbalancing & DNAT rules. // Firstly, categorize each endpoint into three buckets:
n := len(endpointChains) // 1. all endpoints that are ready and NOT terminating.
localEndpointChains := make([]utiliptables.Chain, 0) // 2. all endpoints that are local, ready and NOT terminating, and externalTrafficPolicy=Local
// 3. all endpoints that are local, serving and terminating, and externalTrafficPolicy=Local
readyEndpointChains := make([]utiliptables.Chain, 0, len(endpointChains))
readyEndpoints := make([]*endpointsInfo, 0, len(allEndpoints))
localReadyEndpointChains := make([]utiliptables.Chain, 0, len(endpointChains))
localServingTerminatingEndpointChains := make([]utiliptables.Chain, 0, len(endpointChains))
for i, endpointChain := range endpointChains { for i, endpointChain := range endpointChains {
// Write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic. if endpoints[i].Ready {
if svcInfo.NodeLocalExternal() && endpoints[i].IsLocal { readyEndpointChains = append(readyEndpointChains, endpointChain)
localEndpointChains = append(localEndpointChains, endpointChains[i]) readyEndpoints = append(readyEndpoints, endpoints[i])
} }
epIP := endpoints[i].IP() if svc.NodeLocalExternal() && endpoints[i].IsLocal {
if endpoints[i].Ready {
localReadyEndpointChains = append(localReadyEndpointChains, endpointChain)
} else if endpoints[i].Serving && endpoints[i].Terminating {
localServingTerminatingEndpointChains = append(localServingTerminatingEndpointChains, endpointChain)
}
}
}
// Now write loadbalancing & DNAT rules.
numReadyEndpoints := len(readyEndpointChains)
for i, endpointChain := range readyEndpointChains {
epIP := readyEndpoints[i].IP()
if epIP == "" { if epIP == "" {
// Error parsing this endpoint has been logged. Skip to next endpoint. // Error parsing this endpoint has been logged. Skip to next endpoint.
continue continue
@ -1419,16 +1427,26 @@ func (proxier *Proxier) syncProxyRules() {
// Balancing rules in the per-service chain. // Balancing rules in the per-service chain.
args = append(args[:0], "-A", string(svcChain)) args = append(args[:0], "-A", string(svcChain))
args = proxier.appendServiceCommentLocked(args, svcNameString) args = proxier.appendServiceCommentLocked(args, svcNameString)
if i < (n - 1) { if i < (numReadyEndpoints - 1) {
// Each rule is a probabilistic match. // Each rule is a probabilistic match.
args = append(args, args = append(args,
"-m", "statistic", "-m", "statistic",
"--mode", "random", "--mode", "random",
"--probability", proxier.probability(n-i)) "--probability", proxier.probability(numReadyEndpoints-i))
} }
// The final (or only if n == 1) rule is a guaranteed match. // The final (or only if n == 1) rule is a guaranteed match.
args = append(args, "-j", string(endpointChain)) args = append(args, "-j", string(endpointChain))
utilproxy.WriteLine(proxier.natRules, args...) utilproxy.WriteLine(proxier.natRules, args...)
}
// Every endpoint gets a chain, regardless of its state. This is required later since we may
// want to jump to endpoint chains that are terminating.
for i, endpointChain := range endpointChains {
epIP := endpoints[i].IP()
if epIP == "" {
// Error parsing this endpoint has been logged. Skip to next endpoint.
continue
}
// Rules in the per-endpoint chain. // Rules in the per-endpoint chain.
args = append(args[:0], "-A", string(endpointChain)) args = append(args[:0], "-A", string(endpointChain))
@ -1474,6 +1492,12 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString),
"-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...) "-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...)
// Prefer local ready endpoint chains, but fall back to ready terminating if none exist
localEndpointChains := localReadyEndpointChains
if len(localEndpointChains) == 0 {
localEndpointChains = localServingTerminatingEndpointChains
}
numLocalEndpoints := len(localEndpointChains) numLocalEndpoints := len(localEndpointChains)
if numLocalEndpoints == 0 { if numLocalEndpoints == 0 {
// Blackhole all traffic since there are no local endpoints // Blackhole all traffic since there are no local endpoints