Fix regression in kube-proxy (#106214)

* Fix regression in kube-proxy

Don't use a prepend() - that allocates.  Instead, make Write() take
either strings or slices (I wish we could express that better).

* WIP: switch to intf

* WIP: less appends

* tests and ipvs
This commit is contained in:
Tim Hockin 2021-11-08 15:14:49 -08:00 committed by GitHub
parent cda360c59f
commit 731dc8cf74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 126 additions and 88 deletions

View File

@ -761,10 +761,6 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
} }
} }
func prepend(sl []string, args ...string) []string {
return append(args, sl...)
}
const endpointChainsNumberThreshold = 1000 const endpointChainsNumberThreshold = 1000
// Assumes proxier.mu is held. // Assumes proxier.mu is held.
@ -937,7 +933,7 @@ func (proxier *Proxier) syncProxyRules() {
if proxier.iptables.HasRandomFully() { if proxier.iptables.HasRandomFully() {
masqRule = append(masqRule, "--random-fully") masqRule = append(masqRule, "--random-fully")
} }
proxier.natRules.Write(masqRule...) proxier.natRules.Write(masqRule)
// Install the kubernetes-specific masquerade mark rule. We use a whole chain for // Install the kubernetes-specific masquerade mark rule. We use a whole chain for
// this so that it is easier to flush and change, for example if the mark // this so that it is easier to flush and change, for example if the mark
@ -1067,16 +1063,17 @@ func (proxier *Proxier) syncProxyRules() {
args = append(args[:0], "-A", string(endpointChain)) args = append(args[:0], "-A", string(endpointChain))
args = proxier.appendServiceCommentLocked(args, svcNameString) args = proxier.appendServiceCommentLocked(args, svcNameString)
// Handle traffic that loops back to the originator with SNAT. // Handle traffic that loops back to the originator with SNAT.
proxier.natRules.Write(append(args, proxier.natRules.Write(
args,
"-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(epInfo.IP())), "-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(epInfo.IP())),
"-j", string(KubeMarkMasqChain))...) "-j", string(KubeMarkMasqChain))
// Update client-affinity lists. // Update client-affinity lists.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") args = append(args, "-m", "recent", "--name", string(endpointChain), "--set")
} }
// DNAT to final destination. // DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.Endpoint) args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.Endpoint)
proxier.natRules.Write(args...) proxier.natRules.Write(args)
} }
svcChain := svcInfo.servicePortChainName svcChain := svcInfo.servicePortChainName
@ -1111,22 +1108,24 @@ func (proxier *Proxier) syncProxyRules() {
"--dport", strconv.Itoa(svcInfo.Port()), "--dport", strconv.Itoa(svcInfo.Port()),
) )
if proxier.masqueradeAll { if proxier.masqueradeAll {
args := prepend(args, "-A", string(svcChain)) proxier.natRules.Write(
args = append(args, "-j", string(KubeMarkMasqChain)) "-A", string(svcChain),
proxier.natRules.Write(args...) args,
"-j", string(KubeMarkMasqChain))
} else if proxier.localDetector.IsImplemented() { } else if proxier.localDetector.IsImplemented() {
// This masquerades off-cluster traffic to a service VIP. The idea // This masquerades off-cluster traffic to a service VIP. The idea
// is that you can establish a static route for your Service range, // is that you can establish a static route for your Service range,
// routing to any node, and that node will bridge into the Service // routing to any node, and that node will bridge into the Service
// for you. Since that might bounce off-node, we masquerade here. // for you. Since that might bounce off-node, we masquerade here.
// If/when we support "Local" policy for VIPs, we should update this. // If/when we support "Local" policy for VIPs, we should update this.
args := prepend(args, "-A", string(svcChain)) proxier.natRules.Write(
args = proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain)) "-A", string(svcChain),
proxier.natRules.Write(args...) proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain)))
} }
args = prepend(args, "-A", string(kubeServicesChain)) proxier.natRules.Write(
args = append(args, "-j", string(svcChain)) "-A", string(kubeServicesChain),
proxier.natRules.Write(args...) args,
"-j", string(svcChain))
} else { } else {
// No endpoints. // No endpoints.
proxier.filterRules.Write( proxier.filterRules.Write(
@ -1189,21 +1188,25 @@ func (proxier *Proxier) syncProxyRules() {
// be always forwarded to the corresponding Service, so no need to SNAT // be always forwarded to the corresponding Service, so no need to SNAT
// If we can't differentiate the local traffic we always SNAT. // If we can't differentiate the local traffic we always SNAT.
if !svcInfo.NodeLocalExternal() { if !svcInfo.NodeLocalExternal() {
appendTo := []string{"-A", string(svcChain)}
destChain = svcChain destChain = svcChain
args := prepend(args, "-A", string(svcChain))
// This masquerades off-cluster traffic to a External IP. // This masquerades off-cluster traffic to a External IP.
if proxier.localDetector.IsImplemented() { if proxier.localDetector.IsImplemented() {
proxier.natRules.Write( proxier.natRules.Write(
proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...) appendTo,
proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain)))
} else { } else {
proxier.natRules.Write( proxier.natRules.Write(
append(args, "-j", string(KubeMarkMasqChain))...) appendTo,
args,
"-j", string(KubeMarkMasqChain))
} }
} }
// Send traffic bound for external IPs to the service chain. // Send traffic bound for external IPs to the service chain.
args = prepend(args, "-A", string(kubeServicesChain))
proxier.natRules.Write( proxier.natRules.Write(
append(args, "-j", string(destChain))...) "-A", string(kubeServicesChain),
args,
"-j", string(destChain))
} else { } else {
// No endpoints. // No endpoints.
@ -1242,7 +1245,7 @@ func (proxier *Proxier) syncProxyRules() {
"--dport", strconv.Itoa(svcInfo.Port()), "--dport", strconv.Itoa(svcInfo.Port()),
) )
// jump to service firewall chain // jump to service firewall chain
proxier.natRules.Write(append(args, "-j", string(fwChain))...) proxier.natRules.Write(args, "-j", string(fwChain))
args = append(args[:0], args = append(args[:0],
"-A", string(fwChain), "-A", string(fwChain),
@ -1254,18 +1257,18 @@ func (proxier *Proxier) syncProxyRules() {
// If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP. // If we are proxying only locally, we can retain the source IP.
if !svcInfo.NodeLocalExternal() { if !svcInfo.NodeLocalExternal() {
proxier.natRules.Write(append(args, "-j", string(KubeMarkMasqChain))...) proxier.natRules.Write(args, "-j", string(KubeMarkMasqChain))
chosenChain = svcChain chosenChain = svcChain
} }
if len(svcInfo.LoadBalancerSourceRanges()) == 0 { if len(svcInfo.LoadBalancerSourceRanges()) == 0 {
// allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain
proxier.natRules.Write(append(args, "-j", string(chosenChain))...) proxier.natRules.Write(args, "-j", string(chosenChain))
} else { } else {
// firewall filter based on each source range // firewall filter based on each source range
allowFromNode := false allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges() { for _, src := range svcInfo.LoadBalancerSourceRanges() {
proxier.natRules.Write(append(args, "-s", src, "-j", string(chosenChain))...) proxier.natRules.Write(args, "-s", src, "-j", string(chosenChain))
_, cidr, err := netutils.ParseCIDRSloppy(src) _, cidr, err := netutils.ParseCIDRSloppy(src)
if err != nil { if err != nil {
klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr) klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr)
@ -1277,13 +1280,13 @@ func (proxier *Proxier) syncProxyRules() {
// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
// Need to add the following rule to allow request on host. // Need to add the following rule to allow request on host.
if allowFromNode { if allowFromNode {
proxier.natRules.Write(append(args, "-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(ingress)), "-j", string(chosenChain))...) proxier.natRules.Write(args, "-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(ingress)), "-j", string(chosenChain))
} }
} }
// If the packet was able to reach the end of firewall chain, then it did not get DNATed. // If the packet was able to reach the end of firewall chain, then it did not get DNATed.
// It means the packet cannot go thru the firewall, then mark it for DROP // It means the packet cannot go thru the firewall, then mark it for DROP
proxier.natRules.Write(append(args, "-j", string(KubeMarkDropChain))...) proxier.natRules.Write(args, "-j", string(KubeMarkDropChain))
} else { } else {
// No endpoints. // No endpoints.
proxier.filterRules.Write( proxier.filterRules.Write(
@ -1361,10 +1364,14 @@ func (proxier *Proxier) syncProxyRules() {
if !svcInfo.NodeLocalExternal() { if !svcInfo.NodeLocalExternal() {
// Nodeports need SNAT, unless they're local. // Nodeports need SNAT, unless they're local.
proxier.natRules.Write( proxier.natRules.Write(
append(prepend(args, "-A", string(svcChain)), "-j", string(KubeMarkMasqChain))...) "-A", string(svcChain),
args,
"-j", string(KubeMarkMasqChain))
// Jump to the service chain. // Jump to the service chain.
proxier.natRules.Write( proxier.natRules.Write(
append(prepend(args, "-A", string(kubeNodePortsChain)), "-j", string(svcChain))...) "-A", string(kubeNodePortsChain),
args,
"-j", string(svcChain))
} else { } else {
// TODO: Make all nodePorts jump to the firewall chain. // TODO: Make all nodePorts jump to the firewall chain.
// Currently we only create it for loadbalancers (#33586). // Currently we only create it for loadbalancers (#33586).
@ -1374,11 +1381,15 @@ func (proxier *Proxier) syncProxyRules() {
if isIPv6 { if isIPv6 {
loopback = "::1/128" loopback = "::1/128"
} }
args = prepend(args, "-A", string(kubeNodePortsChain)) appendTo := []string{"-A", string(kubeNodePortsChain)}
proxier.natRules.Write( proxier.natRules.Write(
append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...) appendTo,
args,
"-s", loopback, "-j", string(KubeMarkMasqChain))
proxier.natRules.Write( proxier.natRules.Write(
append(args, "-j", string(svcXlbChain))...) appendTo,
args,
"-j", string(svcXlbChain))
} }
} else { } else {
// No endpoints. // No endpoints.
@ -1422,7 +1433,7 @@ func (proxier *Proxier) syncProxyRules() {
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap", "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
"-j", string(endpointChain), "-j", string(endpointChain),
) )
proxier.natRules.Write(args...) proxier.natRules.Write(args)
} }
} }
@ -1440,8 +1451,7 @@ func (proxier *Proxier) syncProxyRules() {
"--probability", proxier.probability(numReadyEndpoints-i)) "--probability", proxier.probability(numReadyEndpoints-i))
} }
// The final (or only if n == 1) rule is a guaranteed match. // The final (or only if n == 1) rule is a guaranteed match.
args = append(args, "-j", string(endpointChain)) proxier.natRules.Write(args, "-j", string(endpointChain))
proxier.natRules.Write(args...)
} }
// The logic below this applies only if this service is marked as OnlyLocal // The logic below this applies only if this service is marked as OnlyLocal
@ -1458,19 +1468,21 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", "-m", "comment", "--comment",
`"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`, `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`,
) )
proxier.natRules.Write(proxier.localDetector.JumpIfLocal(args, string(svcChain))...) proxier.natRules.Write(proxier.localDetector.JumpIfLocal(args, string(svcChain)))
} }
// Next, redirect all src-type=LOCAL -> LB IP to the service chain for externalTrafficPolicy=Local // Next, redirect all src-type=LOCAL -> LB IP to the service chain for externalTrafficPolicy=Local
// This allows traffic originating from the host to be redirected to the service correctly, // This allows traffic originating from the host to be redirected to the service correctly,
// otherwise traffic to LB IPs are dropped if there are no local endpoints. // otherwise traffic to LB IPs are dropped if there are no local endpoints.
args = append(args[:0], "-A", string(svcXlbChain)) args = append(args[:0], "-A", string(svcXlbChain))
proxier.natRules.Write(append(args, proxier.natRules.Write(
args,
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s LB IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s LB IP"`, svcNameString),
"-m", "addrtype", "--src-type", "LOCAL", "-j", string(KubeMarkMasqChain))...) "-m", "addrtype", "--src-type", "LOCAL", "-j", string(KubeMarkMasqChain))
proxier.natRules.Write(append(args, proxier.natRules.Write(
args,
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString),
"-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...) "-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))
numLocalEndpoints := len(localEndpointChains) numLocalEndpoints := len(localEndpointChains)
if numLocalEndpoints == 0 { if numLocalEndpoints == 0 {
@ -1482,7 +1494,7 @@ func (proxier *Proxier) syncProxyRules() {
"-j", "-j",
string(KubeMarkDropChain), string(KubeMarkDropChain),
) )
proxier.natRules.Write(args...) proxier.natRules.Write(args)
} else { } else {
// First write session affinity rules only over local endpoints, if applicable. // First write session affinity rules only over local endpoints, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
@ -1512,8 +1524,7 @@ func (proxier *Proxier) syncProxyRules() {
"--probability", proxier.probability(numLocalEndpoints-i)) "--probability", proxier.probability(numLocalEndpoints-i))
} }
// The final (or only if n == 1) rule is a guaranteed match. // The final (or only if n == 1) rule is a guaranteed match.
args = append(args, "-j", string(endpointChain)) proxier.natRules.Write(args, "-j", string(endpointChain))
proxier.natRules.Write(args...)
} }
} }
} }
@ -1545,7 +1556,7 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
"-m", "addrtype", "--dst-type", "LOCAL", "-m", "addrtype", "--dst-type", "LOCAL",
"-j", string(kubeNodePortsChain)) "-j", string(kubeNodePortsChain))
proxier.natRules.Write(args...) proxier.natRules.Write(args)
// Nothing else matters after the zero CIDR. // Nothing else matters after the zero CIDR.
break break
} }
@ -1560,7 +1571,7 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`,
"-d", address, "-d", address,
"-j", string(kubeNodePortsChain)) "-j", string(kubeNodePortsChain))
proxier.natRules.Write(args...) proxier.natRules.Write(args)
} }
// Drop the packets in INVALID state, which would potentially cause // Drop the packets in INVALID state, which would potentially cause

View File

@ -1704,7 +1704,7 @@ func (proxier *Proxier) writeIptablesRules() {
"-m", "set", "--match-set", proxier.ipsetList[set.name].Name, "-m", "set", "--match-set", proxier.ipsetList[set.name].Name,
set.matchType, set.matchType,
) )
proxier.natRules.Write(append(args, "-j", set.to)...) proxier.natRules.Write(args, "-j", set.to)
} }
} }
@ -1715,14 +1715,14 @@ func (proxier *Proxier) writeIptablesRules() {
"-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name, "-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name,
) )
if proxier.masqueradeAll { if proxier.masqueradeAll {
proxier.natRules.Write(append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...) proxier.natRules.Write(args, "dst,dst", "-j", string(KubeMarkMasqChain))
} else if proxier.localDetector.IsImplemented() { } else if proxier.localDetector.IsImplemented() {
// This masquerades off-cluster traffic to a service VIP. The idea // This masquerades off-cluster traffic to a service VIP. The idea
// is that you can establish a static route for your Service range, // is that you can establish a static route for your Service range,
// routing to any node, and that node will bridge into the Service // routing to any node, and that node will bridge into the Service
// for you. Since that might bounce off-node, we masquerade here. // for you. Since that might bounce off-node, we masquerade here.
// If/when we support "Local" policy for VIPs, we should update this. // If/when we support "Local" policy for VIPs, we should update this.
proxier.natRules.Write(proxier.localDetector.JumpIfNotLocal(append(args, "dst,dst"), string(KubeMarkMasqChain))...) proxier.natRules.Write(proxier.localDetector.JumpIfNotLocal(append(args, "dst,dst"), string(KubeMarkMasqChain)))
} else { } else {
// Masquerade all OUTPUT traffic coming from a service ip. // Masquerade all OUTPUT traffic coming from a service ip.
// The kube dummy interface has all service VIPs assigned which // The kube dummy interface has all service VIPs assigned which
@ -1731,7 +1731,7 @@ func (proxier *Proxier) writeIptablesRules() {
// VIP:<service port>. // VIP:<service port>.
// Always masquerading OUTPUT (node-originating) traffic with a VIP // Always masquerading OUTPUT (node-originating) traffic with a VIP
// source ip and service port destination fixes the outgoing connections. // source ip and service port destination fixes the outgoing connections.
proxier.natRules.Write(append(args, "src,dst", "-j", string(KubeMarkMasqChain))...) proxier.natRules.Write(args, "src,dst", "-j", string(KubeMarkMasqChain))
} }
} }
@ -1744,11 +1744,11 @@ func (proxier *Proxier) writeIptablesRules() {
externalTrafficOnlyArgs := append(args, externalTrafficOnlyArgs := append(args,
"-m", "physdev", "!", "--physdev-is-in", "-m", "physdev", "!", "--physdev-is-in",
"-m", "addrtype", "!", "--src-type", "LOCAL") "-m", "addrtype", "!", "--src-type", "LOCAL")
proxier.natRules.Write(append(externalTrafficOnlyArgs, "-j", "ACCEPT")...) proxier.natRules.Write(externalTrafficOnlyArgs, "-j", "ACCEPT")
dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL")
// Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local.
// This covers cases like GCE load-balancers which get added to the local routing table. // This covers cases like GCE load-balancers which get added to the local routing table.
proxier.natRules.Write(append(dstLocalOnlyArgs, "-j", "ACCEPT")...) proxier.natRules.Write(dstLocalOnlyArgs, "-j", "ACCEPT")
} }
if !proxier.ipsetList[kubeExternalIPSet].isEmpty() { if !proxier.ipsetList[kubeExternalIPSet].isEmpty() {
@ -1759,7 +1759,7 @@ func (proxier *Proxier) writeIptablesRules() {
"-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name, "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name,
"dst,dst", "dst,dst",
) )
proxier.natRules.Write(append(args, "-j", string(KubeMarkMasqChain))...) proxier.natRules.Write(args, "-j", string(KubeMarkMasqChain))
externalIPRules(args) externalIPRules(args)
} }
@ -1778,19 +1778,19 @@ func (proxier *Proxier) writeIptablesRules() {
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "addrtype", "--dst-type", "LOCAL", "-m", "addrtype", "--dst-type", "LOCAL",
) )
proxier.natRules.Write(append(args, "-j", string(KubeNodePortChain))...) proxier.natRules.Write(args, "-j", string(KubeNodePortChain))
// mark drop for KUBE-LOAD-BALANCER // mark drop for KUBE-LOAD-BALANCER
proxier.natRules.Write([]string{ proxier.natRules.Write(
"-A", string(KubeLoadBalancerChain), "-A", string(KubeLoadBalancerChain),
"-j", string(KubeMarkMasqChain), "-j", string(KubeMarkMasqChain),
}...) )
// mark drop for KUBE-FIRE-WALL // mark drop for KUBE-FIRE-WALL
proxier.natRules.Write([]string{ proxier.natRules.Write(
"-A", string(KubeFireWallChain), "-A", string(KubeFireWallChain),
"-j", string(KubeMarkDropChain), "-j", string(KubeMarkDropChain),
}...) )
// Accept all traffic with destination of ipvs virtual service, in case other iptables rules // Accept all traffic with destination of ipvs virtual service, in case other iptables rules
// block the traffic, that may result in ipvs rules invalid. // block the traffic, that may result in ipvs rules invalid.
@ -1837,17 +1837,17 @@ func (proxier *Proxier) writeIptablesRules() {
// this so that it is easier to flush and change, for example if the mark // this so that it is easier to flush and change, for example if the mark
// value should ever change. // value should ever change.
// NB: THIS MUST MATCH the corresponding code in the kubelet // NB: THIS MUST MATCH the corresponding code in the kubelet
proxier.natRules.Write([]string{ proxier.natRules.Write(
"-A", string(kubePostroutingChain), "-A", string(kubePostroutingChain),
"-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark),
"-j", "RETURN", "-j", "RETURN",
}...) )
// Clear the mark to avoid re-masquerading if the packet re-traverses the network stack. // Clear the mark to avoid re-masquerading if the packet re-traverses the network stack.
proxier.natRules.Write([]string{ proxier.natRules.Write(
"-A", string(kubePostroutingChain), "-A", string(kubePostroutingChain),
// XOR proxier.masqueradeMark to unset it // XOR proxier.masqueradeMark to unset it
"-j", "MARK", "--xor-mark", proxier.masqueradeMark, "-j", "MARK", "--xor-mark", proxier.masqueradeMark,
}...) )
masqRule := []string{ masqRule := []string{
"-A", string(kubePostroutingChain), "-A", string(kubePostroutingChain),
"-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
@ -1856,15 +1856,15 @@ func (proxier *Proxier) writeIptablesRules() {
if proxier.iptables.HasRandomFully() { if proxier.iptables.HasRandomFully() {
masqRule = append(masqRule, "--random-fully") masqRule = append(masqRule, "--random-fully")
} }
proxier.natRules.Write(masqRule...) proxier.natRules.Write(masqRule)
// Install the kubernetes-specific masquerade mark rule. We use a whole chain for // Install the kubernetes-specific masquerade mark rule. We use a whole chain for
// this so that it is easier to flush and change, for example if the mark // this so that it is easier to flush and change, for example if the mark
// value should ever change. // value should ever change.
proxier.natRules.Write([]string{ proxier.natRules.Write(
"-A", string(KubeMarkMasqChain), "-A", string(KubeMarkMasqChain),
"-j", "MARK", "--or-mark", proxier.masqueradeMark, "-j", "MARK", "--or-mark", proxier.masqueradeMark,
}...) )
// Write the end-of-table markers. // Write the end-of-table markers.
proxier.filterRules.Write("COMMIT") proxier.filterRules.Write("COMMIT")
@ -1882,11 +1882,11 @@ func (proxier *Proxier) acceptIPVSTraffic() {
default: default:
matchType = "dst,dst" matchType = "dst,dst"
} }
proxier.natRules.Write([]string{ proxier.natRules.Write(
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "set", "--match-set", proxier.ipsetList[set].Name, matchType, "-m", "set", "--match-set", proxier.ipsetList[set].Name, matchType,
"-j", "ACCEPT", "-j", "ACCEPT",
}...) )
} }
} }
} }

View File

@ -472,17 +472,29 @@ type LineBuffer struct {
b bytes.Buffer b bytes.Buffer
} }
// Write joins all words with spaces, terminates with newline and writes to buf. // Write takes a list of arguments, each a string or []string, joins all the
func (buf *LineBuffer) Write(words ...string) { // individual strings with spaces, terminates with newline, and writes to buf.
// We avoid strings.Join for performance reasons. // Any other argument type will panic.
for i := range words { func (buf *LineBuffer) Write(args ...interface{}) {
buf.b.WriteString(words[i]) for i, arg := range args {
if i < len(words)-1 { if i > 0 {
buf.b.WriteByte(' ') buf.b.WriteByte(' ')
} else { }
buf.b.WriteByte('\n') switch x := arg.(type) {
case string:
buf.b.WriteString(x)
case []string:
for j, s := range x {
if j > 0 {
buf.b.WriteByte(' ')
}
buf.b.WriteString(s)
}
default:
panic(fmt.Sprintf("unknown argument type: %T", x))
} }
} }
buf.b.WriteByte('\n')
} }
// WriteBytes writes bytes to buffer, and terminates with newline. // WriteBytes writes bytes to buffer, and terminates with newline.

View File

@ -1148,41 +1148,56 @@ func TestRevertPorts(t *testing.T) {
} }
} }
func TestWriteLine(t *testing.T) { func TestLineBufferWrite(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
words []string input []interface{}
expected string expected string
}{ }{
{ {
name: "write no word", name: "none",
words: []string{}, input: []interface{}{},
expected: "", expected: "\n",
}, },
{ {
name: "write one word", name: "one string",
words: []string{"test1"}, input: []interface{}{"test1"},
expected: "test1\n", expected: "test1\n",
}, },
{ {
name: "write multi word", name: "one slice",
words: []string{"test1", "test2", "test3"}, input: []interface{}{[]string{"test1", "test2"}},
expected: "test1 test2 test3\n", expected: "test1 test2\n",
},
{
name: "mixed",
input: []interface{}{"s1", "s2", []string{"s3", "s4"}, "", "s5", []string{}, []string{"s6"}, "s7"},
expected: "s1 s2 s3 s4 s5 s6 s7\n",
}, },
} }
testBuffer := LineBuffer{} testBuffer := LineBuffer{}
for _, testCase := range testCases { for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) { t.Run(testCase.name, func(t *testing.T) {
testBuffer.Reset() testBuffer.Reset()
testBuffer.Write(testCase.words...) testBuffer.Write(testCase.input...)
if want, got := testCase.expected, string(testBuffer.Bytes()); !strings.EqualFold(want, got) { if want, got := testCase.expected, string(testBuffer.Bytes()); !strings.EqualFold(want, got) {
t.Fatalf("write word is %v\n expected: %s, got: %s", testCase.words, want, got) t.Fatalf("write word is %v\n expected: %q, got: %q", testCase.input, want, got)
} }
}) })
} }
} }
func TestWriteBytesLine(t *testing.T) { func TestLineBufferWritePanic(t *testing.T) {
defer func() {
if r := recover(); r == nil {
t.Errorf("did not panic")
}
}()
testBuffer := LineBuffer{}
testBuffer.Write("string", []string{"a", "slice"}, 1234)
}
func TestLineBufferWriteBytes(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
bytes []byte bytes []byte