mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 02:41:25 +00:00
Merge pull request #106030 from danwinship/session-affinity-readiness
misc iptables proxy fixes
This commit is contained in:
commit
cb040e5097
@ -955,12 +955,8 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// you should always do one of the below:
|
||||
// slice = slice[:0] // and then append to it
|
||||
// slice = append(slice[:0], ...)
|
||||
endpoints := make([]*endpointsInfo, 0)
|
||||
endpointChains := make([]utiliptables.Chain, 0)
|
||||
readyEndpoints := make([]*endpointsInfo, 0)
|
||||
readyEndpointChains := make([]utiliptables.Chain, 0)
|
||||
localReadyEndpointChains := make([]utiliptables.Chain, 0)
|
||||
localServingTerminatingEndpointChains := make([]utiliptables.Chain, 0)
|
||||
localEndpointChains := make([]utiliptables.Chain, 0)
|
||||
|
||||
// To avoid growing this slice, we arbitrarily set its size to 64,
|
||||
// there is never more than that many arguments for a single line.
|
||||
@ -1002,7 +998,82 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
// Service does not have conflicting configuration such as
|
||||
// externalTrafficPolicy=Local.
|
||||
allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
|
||||
hasEndpoints := len(allEndpoints) > 0
|
||||
|
||||
// Scan the endpoints list to see what we have. "hasEndpoints" will be true
|
||||
// if there are any usable endpoints for this service anywhere in the cluster.
|
||||
var hasEndpoints, hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
|
||||
for _, ep := range allEndpoints {
|
||||
if ep.IsReady() {
|
||||
hasEndpoints = true
|
||||
if ep.GetIsLocal() {
|
||||
hasLocalReadyEndpoints = true
|
||||
}
|
||||
} else if svc.NodeLocalExternal() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
|
||||
if ep.IsServing() && ep.IsTerminating() {
|
||||
hasEndpoints = true
|
||||
if ep.GetIsLocal() {
|
||||
hasLocalServingTerminatingEndpoints = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
useTerminatingEndpoints := !hasLocalReadyEndpoints && hasLocalServingTerminatingEndpoints
|
||||
|
||||
// Generate the per-endpoint chains.
|
||||
readyEndpointChains = readyEndpointChains[:0]
|
||||
localEndpointChains = localEndpointChains[:0]
|
||||
for _, ep := range allEndpoints {
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
if !ok {
|
||||
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep)
|
||||
continue
|
||||
}
|
||||
|
||||
endpointChain := epInfo.endpointChain(svcNameString, protocol)
|
||||
endpointInUse := false
|
||||
|
||||
if epInfo.Ready {
|
||||
readyEndpointChains = append(readyEndpointChains, endpointChain)
|
||||
endpointInUse = true
|
||||
}
|
||||
if svc.NodeLocalExternal() && epInfo.IsLocal {
|
||||
if useTerminatingEndpoints {
|
||||
if epInfo.Serving && epInfo.Terminating {
|
||||
localEndpointChains = append(localEndpointChains, endpointChain)
|
||||
endpointInUse = true
|
||||
}
|
||||
} else if epInfo.Ready {
|
||||
localEndpointChains = append(localEndpointChains, endpointChain)
|
||||
endpointInUse = true
|
||||
}
|
||||
}
|
||||
|
||||
if !endpointInUse {
|
||||
continue
|
||||
}
|
||||
|
||||
// Create the endpoint chain, retaining counters if possible.
|
||||
if chain, ok := existingNATChains[endpointChain]; ok {
|
||||
utilproxy.WriteBytesLine(proxier.natChains, chain)
|
||||
} else {
|
||||
utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
|
||||
}
|
||||
activeNATChains[endpointChain] = true
|
||||
|
||||
args = append(args[:0], "-A", string(endpointChain))
|
||||
args = proxier.appendServiceCommentLocked(args, svcNameString)
|
||||
// Handle traffic that loops back to the originator with SNAT.
|
||||
utilproxy.WriteLine(proxier.natRules, append(args,
|
||||
"-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(epInfo.IP())),
|
||||
"-j", string(KubeMarkMasqChain))...)
|
||||
// Update client-affinity lists.
|
||||
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
|
||||
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
|
||||
}
|
||||
// DNAT to final destination.
|
||||
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.Endpoint)
|
||||
utilproxy.WriteLine(proxier.natRules, args...)
|
||||
}
|
||||
|
||||
svcChain := svcInfo.servicePortChainName
|
||||
if hasEndpoints {
|
||||
@ -1319,35 +1390,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
continue
|
||||
}
|
||||
|
||||
// Generate the per-endpoint chains. We do this in multiple passes so we
|
||||
// can group rules together.
|
||||
// These two slices parallel each other - keep in sync
|
||||
endpoints = endpoints[:0]
|
||||
endpointChains = endpointChains[:0]
|
||||
var endpointChain utiliptables.Chain
|
||||
for _, ep := range allEndpoints {
|
||||
epInfo, ok := ep.(*endpointsInfo)
|
||||
if !ok {
|
||||
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep)
|
||||
continue
|
||||
}
|
||||
|
||||
endpoints = append(endpoints, epInfo)
|
||||
endpointChain = epInfo.endpointChain(svcNameString, protocol)
|
||||
endpointChains = append(endpointChains, endpointChain)
|
||||
|
||||
// Create the endpoint chain, retaining counters if possible.
|
||||
if chain, ok := existingNATChains[endpointChain]; ok {
|
||||
utilproxy.WriteBytesLine(proxier.natChains, chain)
|
||||
} else {
|
||||
utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain))
|
||||
}
|
||||
activeNATChains[endpointChain] = true
|
||||
}
|
||||
|
||||
// First write session affinity rules, if applicable.
|
||||
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
|
||||
for _, endpointChain := range endpointChains {
|
||||
for _, endpointChain := range readyEndpointChains {
|
||||
args = append(args[:0],
|
||||
"-A", string(svcChain),
|
||||
)
|
||||
@ -1361,38 +1406,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
}
|
||||
}
|
||||
|
||||
// Firstly, categorize each endpoint into three buckets:
|
||||
// 1. all endpoints that are ready and NOT terminating.
|
||||
// 2. all endpoints that are local, ready and NOT terminating, and externalTrafficPolicy=Local
|
||||
// 3. all endpoints that are local, serving and terminating, and externalTrafficPolicy=Local
|
||||
readyEndpointChains = readyEndpointChains[:0]
|
||||
readyEndpoints := readyEndpoints[:0]
|
||||
localReadyEndpointChains := localReadyEndpointChains[:0]
|
||||
localServingTerminatingEndpointChains := localServingTerminatingEndpointChains[:0]
|
||||
for i, endpointChain := range endpointChains {
|
||||
if endpoints[i].Ready {
|
||||
readyEndpointChains = append(readyEndpointChains, endpointChain)
|
||||
readyEndpoints = append(readyEndpoints, endpoints[i])
|
||||
}
|
||||
|
||||
if svc.NodeLocalExternal() && endpoints[i].IsLocal {
|
||||
if endpoints[i].Ready {
|
||||
localReadyEndpointChains = append(localReadyEndpointChains, endpointChain)
|
||||
} else if endpoints[i].Serving && endpoints[i].Terminating {
|
||||
localServingTerminatingEndpointChains = append(localServingTerminatingEndpointChains, endpointChain)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Now write loadbalancing & DNAT rules.
|
||||
// Now write loadbalancing rules
|
||||
numReadyEndpoints := len(readyEndpointChains)
|
||||
for i, endpointChain := range readyEndpointChains {
|
||||
epIP := readyEndpoints[i].IP()
|
||||
if epIP == "" {
|
||||
// Error parsing this endpoint has been logged. Skip to next endpoint.
|
||||
continue
|
||||
}
|
||||
|
||||
// Balancing rules in the per-service chain.
|
||||
args = append(args[:0], "-A", string(svcChain))
|
||||
args = proxier.appendServiceCommentLocked(args, svcNameString)
|
||||
@ -1408,31 +1424,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
utilproxy.WriteLine(proxier.natRules, args...)
|
||||
}
|
||||
|
||||
// Every endpoint gets a chain, regardless of its state. This is required later since we may
|
||||
// want to jump to endpoint chains that are terminating.
|
||||
for i, endpointChain := range endpointChains {
|
||||
epIP := endpoints[i].IP()
|
||||
if epIP == "" {
|
||||
// Error parsing this endpoint has been logged. Skip to next endpoint.
|
||||
continue
|
||||
}
|
||||
|
||||
// Rules in the per-endpoint chain.
|
||||
args = append(args[:0], "-A", string(endpointChain))
|
||||
args = proxier.appendServiceCommentLocked(args, svcNameString)
|
||||
// Handle traffic that loops back to the originator with SNAT.
|
||||
utilproxy.WriteLine(proxier.natRules, append(args,
|
||||
"-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(epIP)),
|
||||
"-j", string(KubeMarkMasqChain))...)
|
||||
// Update client-affinity lists.
|
||||
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
|
||||
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
|
||||
}
|
||||
// DNAT to final destination.
|
||||
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].Endpoint)
|
||||
utilproxy.WriteLine(proxier.natRules, args...)
|
||||
}
|
||||
|
||||
// The logic below this applies only if this service is marked as OnlyLocal
|
||||
if !svcInfo.NodeLocalExternal() {
|
||||
continue
|
||||
@ -1461,12 +1452,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString),
|
||||
"-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...)
|
||||
|
||||
// Prefer local ready endpoint chains, but fall back to ready terminating if none exist
|
||||
localEndpointChains := localReadyEndpointChains
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) && len(localEndpointChains) == 0 {
|
||||
localEndpointChains = localServingTerminatingEndpointChains
|
||||
}
|
||||
|
||||
numLocalEndpoints := len(localEndpointChains)
|
||||
if numLocalEndpoints == 0 {
|
||||
// Blackhole all traffic since there are no local endpoints
|
||||
@ -1596,6 +1581,11 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-j", "ACCEPT",
|
||||
)
|
||||
|
||||
numberFilterIptablesRules := utilproxy.CountBytesLines(proxier.filterRules.Bytes())
|
||||
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(numberFilterIptablesRules))
|
||||
numberNatIptablesRules := utilproxy.CountBytesLines(proxier.natRules.Bytes())
|
||||
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(numberNatIptablesRules))
|
||||
|
||||
// Write the end-of-table markers.
|
||||
utilproxy.WriteLine(proxier.filterRules, "COMMIT")
|
||||
utilproxy.WriteLine(proxier.natRules, "COMMIT")
|
||||
@ -1608,11 +1598,6 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
proxier.iptablesData.Write(proxier.natChains.Bytes())
|
||||
proxier.iptablesData.Write(proxier.natRules.Bytes())
|
||||
|
||||
numberFilterIptablesRules := utilproxy.CountBytesLines(proxier.filterRules.Bytes())
|
||||
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableFilter)).Set(float64(numberFilterIptablesRules))
|
||||
numberNatIptablesRules := utilproxy.CountBytesLines(proxier.natRules.Bytes())
|
||||
metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(numberNatIptablesRules))
|
||||
|
||||
klog.V(5).InfoS("Restoring iptables", "rules", proxier.iptablesData.Bytes())
|
||||
err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
|
||||
if err != nil {
|
||||
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user