From 732635fd4b2f9a532f79dcb564ce5c0c74d79605 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Wed, 10 Mar 2021 11:07:44 -0500 Subject: [PATCH] proxier/iptables: fallback to terminating endpoints if there are no ready endpoints Signed-off-by: Andrew Sy Kim --- pkg/proxy/iptables/proxier.go | 64 ++++++++++++++++++++++++----------- 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 0e7fb1a73cb..d745689d9e9 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -1033,16 +1033,7 @@ func (proxier *Proxier) syncProxyRules() { // Service does not have conflicting configuration such as // externalTrafficPolicy=Local. allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) - - readyEndpoints := make([]proxy.Endpoint, 0, len(allEndpoints)) - for _, endpoint := range allEndpoints { - if !endpoint.IsReady() { - continue - } - - readyEndpoints = append(readyEndpoints, endpoint) - } - hasEndpoints := len(readyEndpoints) > 0 + hasEndpoints := len(allEndpoints) > 0 svcChain := svcInfo.servicePortChainName if hasEndpoints { @@ -1365,7 +1356,7 @@ func (proxier *Proxier) syncProxyRules() { endpoints = endpoints[:0] endpointChains = endpointChains[:0] var endpointChain utiliptables.Chain - for _, ep := range readyEndpoints { + for _, ep := range allEndpoints { epInfo, ok := ep.(*endpointsInfo) if !ok { klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep.String()) @@ -1401,16 +1392,33 @@ func (proxier *Proxier) syncProxyRules() { } } - // Now write loadbalancing & DNAT rules. - n := len(endpointChains) - localEndpointChains := make([]utiliptables.Chain, 0) + // Firstly, categorize each endpoint into three buckets: + // 1. all endpoints that are ready and NOT terminating. + // 2. all endpoints that are local, ready and NOT terminating, and externalTrafficPolicy=Local + // 3. all endpoints that are local, serving and terminating, and externalTrafficPolicy=Local + readyEndpointChains := make([]utiliptables.Chain, 0, len(endpointChains)) + readyEndpoints := make([]*endpointsInfo, 0, len(allEndpoints)) + localReadyEndpointChains := make([]utiliptables.Chain, 0, len(endpointChains)) + localServingTerminatingEndpointChains := make([]utiliptables.Chain, 0, len(endpointChains)) for i, endpointChain := range endpointChains { - // Write ingress loadbalancing & DNAT rules only for services that request OnlyLocal traffic. - if svcInfo.NodeLocalExternal() && endpoints[i].IsLocal { - localEndpointChains = append(localEndpointChains, endpointChains[i]) + if endpoints[i].Ready { + readyEndpointChains = append(readyEndpointChains, endpointChain) + readyEndpoints = append(readyEndpoints, endpoints[i]) } - epIP := endpoints[i].IP() + if svc.NodeLocalExternal() && endpoints[i].IsLocal { + if endpoints[i].Ready { + localReadyEndpointChains = append(localReadyEndpointChains, endpointChain) + } else if endpoints[i].Serving && endpoints[i].Terminating { + localServingTerminatingEndpointChains = append(localServingTerminatingEndpointChains, endpointChain) + } + } + } + + // Now write loadbalancing & DNAT rules. + numReadyEndpoints := len(readyEndpointChains) + for i, endpointChain := range readyEndpointChains { + epIP := readyEndpoints[i].IP() if epIP == "" { // Error parsing this endpoint has been logged. Skip to next endpoint. continue @@ -1419,16 +1427,26 @@ func (proxier *Proxier) syncProxyRules() { // Balancing rules in the per-service chain. args = append(args[:0], "-A", string(svcChain)) args = proxier.appendServiceCommentLocked(args, svcNameString) - if i < (n - 1) { + if i < (numReadyEndpoints - 1) { // Each rule is a probabilistic match. args = append(args, "-m", "statistic", "--mode", "random", - "--probability", proxier.probability(n-i)) + "--probability", proxier.probability(numReadyEndpoints-i)) } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) utilproxy.WriteLine(proxier.natRules, args...) + } + + // Every endpoint gets a chain, regardless of its state. This is required later since we may + // want to jump to endpoint chains that are terminating. + for i, endpointChain := range endpointChains { + epIP := endpoints[i].IP() + if epIP == "" { + // Error parsing this endpoint has been logged. Skip to next endpoint. + continue + } // Rules in the per-endpoint chain. args = append(args[:0], "-A", string(endpointChain)) @@ -1474,6 +1492,12 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString), "-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...) + // Prefer local ready endpoint chains, but fall back to ready terminating if none exist + localEndpointChains := localReadyEndpointChains + if len(localEndpointChains) == 0 { + localEndpointChains = localServingTerminatingEndpointChains + } + numLocalEndpoints := len(localEndpointChains) if numLocalEndpoints == 0 { // Blackhole all traffic since there are no local endpoints