From 37838215535849e9be55602e1921200075546ba8 Mon Sep 17 00:00:00 2001 From: jornshen Date: Tue, 19 Jan 2021 21:09:03 +0800 Subject: [PATCH 1/2] move the redundant writeline writeBytesLine to proxy/util/util.go --- pkg/proxy/iptables/proxier.go | 154 +++++++++++++++------------------- pkg/proxy/ipvs/proxier.go | 70 ++++++---------- pkg/proxy/util/utils.go | 20 +++++ 3 files changed, 114 insertions(+), 130 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 78b837c5756..a4cbb1fadff 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -441,24 +441,24 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes()) natChains := bytes.NewBuffer(nil) natRules := bytes.NewBuffer(nil) - writeLine(natChains, "*nat") + utilproxy.WriteLine(natChains, "*nat") // Start with chains we know we need to remove. for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain} { if _, found := existingNATChains[chain]; found { chainString := string(chain) - writeBytesLine(natChains, existingNATChains[chain]) // flush - writeLine(natRules, "-X", chainString) // delete + utilproxy.WriteBytesLine(natChains, existingNATChains[chain]) // flush + utilproxy.WriteLine(natRules, "-X", chainString) // delete } } // Hunt for service and endpoint chains. for chain := range existingNATChains { chainString := string(chain) if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") { - writeBytesLine(natChains, existingNATChains[chain]) // flush - writeLine(natRules, "-X", chainString) // delete + utilproxy.WriteBytesLine(natChains, existingNATChains[chain]) // flush + utilproxy.WriteLine(natRules, "-X", chainString) // delete } } - writeLine(natRules, "COMMIT") + utilproxy.WriteLine(natRules, "COMMIT") natLines := append(natChains.Bytes(), natRules.Bytes()...) // Write it. err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) @@ -478,15 +478,15 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { existingFilterChains := utiliptables.GetChainLines(utiliptables.TableFilter, iptablesData.Bytes()) filterChains := bytes.NewBuffer(nil) filterRules := bytes.NewBuffer(nil) - writeLine(filterChains, "*filter") + utilproxy.WriteLine(filterChains, "*filter") for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} { if _, found := existingFilterChains[chain]; found { chainString := string(chain) - writeBytesLine(filterChains, existingFilterChains[chain]) - writeLine(filterRules, "-X", chainString) + utilproxy.WriteBytesLine(filterChains, existingFilterChains[chain]) + utilproxy.WriteLine(filterRules, "-X", chainString) } } - writeLine(filterRules, "COMMIT") + utilproxy.WriteLine(filterRules, "COMMIT") filterLines := append(filterChains.Bytes(), filterRules.Bytes()...) // Write it. if err := ipt.Restore(utiliptables.TableFilter, filterLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil { @@ -928,23 +928,23 @@ func (proxier *Proxier) syncProxyRules() { proxier.natRules.Reset() // Write table headers. - writeLine(proxier.filterChains, "*filter") - writeLine(proxier.natChains, "*nat") + utilproxy.WriteLine(proxier.filterChains, "*filter") + utilproxy.WriteLine(proxier.natChains, "*nat") // Make sure we keep stats for the top-level chains, if they existed // (which most should have because we created them above). for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain} { if chain, ok := existingFilterChains[chainName]; ok { - writeBytesLine(proxier.filterChains, chain) + utilproxy.WriteBytesLine(proxier.filterChains, chain) } else { - writeLine(proxier.filterChains, utiliptables.MakeChainLine(chainName)) + utilproxy.WriteLine(proxier.filterChains, utiliptables.MakeChainLine(chainName)) } } for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} { if chain, ok := existingNATChains[chainName]; ok { - writeBytesLine(proxier.natChains, chain) + utilproxy.WriteBytesLine(proxier.natChains, chain) } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(chainName)) + utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(chainName)) } } @@ -952,13 +952,13 @@ func (proxier *Proxier) syncProxyRules() { // this so that it is easier to flush and change, for example if the mark // value should ever change. // NB: THIS MUST MATCH the corresponding code in the kubelet - writeLine(proxier.natRules, []string{ + utilproxy.WriteLine(proxier.natRules, []string{ "-A", string(kubePostroutingChain), "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), "-j", "RETURN", }...) // Clear the mark to avoid re-masquerading if the packet re-traverses the network stack. - writeLine(proxier.natRules, []string{ + utilproxy.WriteLine(proxier.natRules, []string{ "-A", string(kubePostroutingChain), // XOR proxier.masqueradeMark to unset it "-j", "MARK", "--xor-mark", proxier.masqueradeMark, @@ -971,12 +971,12 @@ func (proxier *Proxier) syncProxyRules() { if proxier.iptables.HasRandomFully() { masqRule = append(masqRule, "--random-fully") } - writeLine(proxier.natRules, masqRule...) + utilproxy.WriteLine(proxier.natRules, masqRule...) // Install the kubernetes-specific masquerade mark rule. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. - writeLine(proxier.natRules, []string{ + utilproxy.WriteLine(proxier.natRules, []string{ "-A", string(KubeMarkMasqChain), "-j", "MARK", "--or-mark", proxier.masqueradeMark, }...) @@ -1043,9 +1043,9 @@ func (proxier *Proxier) syncProxyRules() { if hasEndpoints { // Create the per-service chain, retaining counters if possible. if chain, ok := existingNATChains[svcChain]; ok { - writeBytesLine(proxier.natChains, chain) + utilproxy.WriteBytesLine(proxier.natChains, chain) } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain)) + utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(svcChain)) } activeNATChains[svcChain] = true } @@ -1055,9 +1055,9 @@ func (proxier *Proxier) syncProxyRules() { // Only for services request OnlyLocal traffic // create the per-service LB chain, retaining counters if possible. if lbChain, ok := existingNATChains[svcXlbChain]; ok { - writeBytesLine(proxier.natChains, lbChain) + utilproxy.WriteBytesLine(proxier.natChains, lbChain) } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain)) + utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain)) } activeNATChains[svcXlbChain] = true } @@ -1072,19 +1072,19 @@ func (proxier *Proxier) syncProxyRules() { "--dport", strconv.Itoa(svcInfo.Port()), ) if proxier.masqueradeAll { - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) } else if proxier.localDetector.IsImplemented() { // This masquerades off-cluster traffic to a service VIP. The idea // is that you can establish a static route for your Service range, // routing to any node, and that node will bridge into the Service // for you. Since that might bounce off-node, we masquerade here. // If/when we support "Local" policy for VIPs, we should update this. - writeLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...) } - writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(svcChain))...) } else { // No endpoints. - writeLine(proxier.filterRules, + utilproxy.WriteLine(proxier.filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, @@ -1146,17 +1146,17 @@ func (proxier *Proxier) syncProxyRules() { destChain = svcChain // This masquerades off-cluster traffic to a External IP. if proxier.localDetector.IsImplemented() { - writeLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...) } else { - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) } } // Sent traffic bound for external IPs to the service chain. - writeLine(proxier.natRules, append(args, "-j", string(destChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(destChain))...) } else { // No endpoints. - writeLine(proxier.filterRules, + utilproxy.WriteLine(proxier.filterRules, "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, @@ -1174,9 +1174,9 @@ func (proxier *Proxier) syncProxyRules() { if hasEndpoints { // create service firewall chain if chain, ok := existingNATChains[fwChain]; ok { - writeBytesLine(proxier.natChains, chain) + utilproxy.WriteBytesLine(proxier.natChains, chain) } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain)) + utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(fwChain)) } activeNATChains[fwChain] = true // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. @@ -1191,7 +1191,7 @@ func (proxier *Proxier) syncProxyRules() { "--dport", strconv.Itoa(svcInfo.Port()), ) // jump to service firewall chain - writeLine(proxier.natRules, append(args, "-j", string(fwChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(fwChain))...) args = append(args[:0], "-A", string(fwChain), @@ -1203,18 +1203,18 @@ func (proxier *Proxier) syncProxyRules() { // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. if !svcInfo.OnlyNodeLocalEndpoints() { - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) chosenChain = svcChain } if len(svcInfo.LoadBalancerSourceRanges()) == 0 { // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain - writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(chosenChain))...) } else { // firewall filter based on each source range allowFromNode := false for _, src := range svcInfo.LoadBalancerSourceRanges() { - writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...) _, cidr, err := net.ParseCIDR(src) if err != nil { klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr) @@ -1226,16 +1226,16 @@ func (proxier *Proxier) syncProxyRules() { // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // Need to add the following rule to allow request on host. if allowFromNode { - writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress)), "-j", string(chosenChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress)), "-j", string(chosenChain))...) } } // If the packet was able to reach the end of firewall chain, then it did not get DNATed. // It means the packet cannot go thru the firewall, then mark it for DROP - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) } else { // No endpoints. - writeLine(proxier.filterRules, + utilproxy.WriteLine(proxier.filterRules, "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, @@ -1309,9 +1309,9 @@ func (proxier *Proxier) syncProxyRules() { ) if !svcInfo.OnlyNodeLocalEndpoints() { // Nodeports need SNAT, unless they're local. - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) // Jump to the service chain. - writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(svcChain))...) } else { // TODO: Make all nodePorts jump to the firewall chain. // Currently we only create it for loadbalancers (#33586). @@ -1321,12 +1321,12 @@ func (proxier *Proxier) syncProxyRules() { if isIPv6 { loopback = "::1/128" } - writeLine(proxier.natRules, append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...) - writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...) } } else { // No endpoints. - writeLine(proxier.filterRules, + utilproxy.WriteLine(proxier.filterRules, "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "addrtype", "--dst-type", "LOCAL", @@ -1360,9 +1360,9 @@ func (proxier *Proxier) syncProxyRules() { // Create the endpoint chain, retaining counters if possible. if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok { - writeBytesLine(proxier.natChains, chain) + utilproxy.WriteBytesLine(proxier.natChains, chain) } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain)) + utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain)) } activeNATChains[endpointChain] = true } @@ -1379,7 +1379,7 @@ func (proxier *Proxier) syncProxyRules() { "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap", "-j", string(endpointChain), ) - writeLine(proxier.natRules, args...) + utilproxy.WriteLine(proxier.natRules, args...) } } @@ -1410,13 +1410,13 @@ func (proxier *Proxier) syncProxyRules() { } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) - writeLine(proxier.natRules, args...) + utilproxy.WriteLine(proxier.natRules, args...) // Rules in the per-endpoint chain. args = append(args[:0], "-A", string(endpointChain)) args = proxier.appendServiceCommentLocked(args, svcNameString) // Handle traffic that loops back to the originator with SNAT. - writeLine(proxier.natRules, append(args, + utilproxy.WriteLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(epIP)), "-j", string(KubeMarkMasqChain))...) // Update client-affinity lists. @@ -1425,7 +1425,7 @@ func (proxier *Proxier) syncProxyRules() { } // DNAT to final destination. args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].Endpoint) - writeLine(proxier.natRules, args...) + utilproxy.WriteLine(proxier.natRules, args...) } // The logic below this applies only if this service is marked as OnlyLocal @@ -1442,17 +1442,17 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`, ) - writeLine(proxier.natRules, proxier.localDetector.JumpIfLocal(args, string(svcChain))...) + utilproxy.WriteLine(proxier.natRules, proxier.localDetector.JumpIfLocal(args, string(svcChain))...) } // Next, redirect all src-type=LOCAL -> LB IP to the service chain for externalTrafficPolicy=Local // This allows traffic originating from the host to be redirected to the service correctly, // otherwise traffic to LB IPs are dropped if there are no local endpoints. args = append(args[:0], "-A", string(svcXlbChain)) - writeLine(proxier.natRules, append(args, + utilproxy.WriteLine(proxier.natRules, append(args, "-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s LB IP"`, svcNameString), "-m", "addrtype", "--src-type", "LOCAL", "-j", string(KubeMarkMasqChain))...) - writeLine(proxier.natRules, append(args, + utilproxy.WriteLine(proxier.natRules, append(args, "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString), "-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...) @@ -1466,12 +1466,12 @@ func (proxier *Proxier) syncProxyRules() { "-j", string(KubeMarkDropChain), ) - writeLine(proxier.natRules, args...) + utilproxy.WriteLine(proxier.natRules, args...) } else { // First write session affinity rules only over local endpoints, if applicable. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { for _, endpointChain := range localEndpointChains { - writeLine(proxier.natRules, + utilproxy.WriteLine(proxier.natRules, "-A", string(svcXlbChain), "-m", "comment", "--comment", svcNameString, "-m", "recent", "--name", string(endpointChain), @@ -1497,7 +1497,7 @@ func (proxier *Proxier) syncProxyRules() { } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) - writeLine(proxier.natRules, args...) + utilproxy.WriteLine(proxier.natRules, args...) } } } @@ -1513,8 +1513,8 @@ func (proxier *Proxier) syncProxyRules() { // We must (as per iptables) write a chain-line for it, which has // the nice effect of flushing the chain. Then we can remove the // chain. - writeBytesLine(proxier.natChains, existingNATChains[chain]) - writeLine(proxier.natRules, "-X", chainString) + utilproxy.WriteBytesLine(proxier.natChains, existingNATChains[chain]) + utilproxy.WriteLine(proxier.natRules, "-X", chainString) } } @@ -1529,7 +1529,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, "-m", "addrtype", "--dst-type", "LOCAL", "-j", string(kubeNodePortsChain)) - writeLine(proxier.natRules, args...) + utilproxy.WriteLine(proxier.natRules, args...) // Nothing else matters after the zero CIDR. break } @@ -1544,13 +1544,13 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, "-d", address, "-j", string(kubeNodePortsChain)) - writeLine(proxier.natRules, args...) + utilproxy.WriteLine(proxier.natRules, args...) } // Drop the packets in INVALID state, which would potentially cause // unexpected connection reset. // https://github.com/kubernetes/kubernetes/issues/74839 - writeLine(proxier.filterRules, + utilproxy.WriteLine(proxier.filterRules, "-A", string(kubeForwardChain), "-m", "conntrack", "--ctstate", "INVALID", @@ -1560,7 +1560,7 @@ func (proxier *Proxier) syncProxyRules() { // If the masqueradeMark has been added then we want to forward that same // traffic, this allows NodePort traffic to be forwarded even if the default // FORWARD policy is not accept. - writeLine(proxier.filterRules, + utilproxy.WriteLine(proxier.filterRules, "-A", string(kubeForwardChain), "-m", "comment", "--comment", `"kubernetes forwarding rules"`, "-m", "mark", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), @@ -1570,14 +1570,14 @@ func (proxier *Proxier) syncProxyRules() { // The following two rules ensure the traffic after the initial packet // accepted by the "kubernetes forwarding rules" rule above will be // accepted. - writeLine(proxier.filterRules, + utilproxy.WriteLine(proxier.filterRules, "-A", string(kubeForwardChain), "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", ) - writeLine(proxier.filterRules, + utilproxy.WriteLine(proxier.filterRules, "-A", string(kubeForwardChain), "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`, "-m", "conntrack", @@ -1586,8 +1586,8 @@ func (proxier *Proxier) syncProxyRules() { ) // Write the end-of-table markers. - writeLine(proxier.filterRules, "COMMIT") - writeLine(proxier.natRules, "COMMIT") + utilproxy.WriteLine(proxier.filterRules, "COMMIT") + utilproxy.WriteLine(proxier.natRules, "COMMIT") // Sync rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table @@ -1652,24 +1652,6 @@ func (proxier *Proxier) syncProxyRules() { proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } -// Join all words with spaces, terminate with newline and write to buf. -func writeLine(buf *bytes.Buffer, words ...string) { - // We avoid strings.Join for performance reasons. - for i := range words { - buf.WriteString(words[i]) - if i < len(words)-1 { - buf.WriteByte(' ') - } else { - buf.WriteByte('\n') - } - } -} - -func writeBytesLine(buf *bytes.Buffer, bytes []byte) { - buf.Write(bytes) - buf.WriteByte('\n') -} - func openLocalPort(lp *utilproxy.LocalPort, isIPv6 bool) (utilproxy.Closeable, error) { // For ports on node IPs, open the actual port and hold it, even though we // use iptables to redirect traffic. diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 2705b1657b9..86c149964ad 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1077,8 +1077,8 @@ func (proxier *Proxier) syncProxyRules() { proxier.filterRules.Reset() // Write table headers. - writeLine(proxier.filterChains, "*filter") - writeLine(proxier.natChains, "*nat") + utilproxy.WriteLine(proxier.filterChains, "*filter") + utilproxy.WriteLine(proxier.natChains, "*nat") proxier.createAndLinkKubeChain() @@ -1696,7 +1696,7 @@ func (proxier *Proxier) writeIptablesRules() { "-m", "set", "--match-set", proxier.ipsetList[set.name].Name, set.matchType, ) - writeLine(proxier.natRules, append(args, "-j", set.to)...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", set.to)...) } } @@ -1707,14 +1707,14 @@ func (proxier *Proxier) writeIptablesRules() { "-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name, ) if proxier.masqueradeAll { - writeLine(proxier.natRules, append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...) } else if proxier.localDetector.IsImplemented() { // This masquerades off-cluster traffic to a service VIP. The idea // is that you can establish a static route for your Service range, // routing to any node, and that node will bridge into the Service // for you. Since that might bounce off-node, we masquerade here. // If/when we support "Local" policy for VIPs, we should update this. - writeLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(append(args, "dst,dst"), string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(append(args, "dst,dst"), string(KubeMarkMasqChain))...) } else { // Masquerade all OUTPUT traffic coming from a service ip. // The kube dummy interface has all service VIPs assigned which @@ -1723,7 +1723,7 @@ func (proxier *Proxier) writeIptablesRules() { // VIP:. // Always masquerading OUTPUT (node-originating) traffic with a VIP // source ip and service port destination fixes the outgoing connections. - writeLine(proxier.natRules, append(args, "src,dst", "-j", string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "src,dst", "-j", string(KubeMarkMasqChain))...) } } @@ -1736,11 +1736,11 @@ func (proxier *Proxier) writeIptablesRules() { externalTrafficOnlyArgs := append(args, "-m", "physdev", "!", "--physdev-is-in", "-m", "addrtype", "!", "--src-type", "LOCAL") - writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...) + utilproxy.WriteLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...) dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. // This covers cases like GCE load-balancers which get added to the local routing table. - writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...) + utilproxy.WriteLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...) } if !proxier.ipsetList[kubeExternalIPSet].isEmpty() { @@ -1751,7 +1751,7 @@ func (proxier *Proxier) writeIptablesRules() { "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name, "dst,dst", ) - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) externalIPRules(args) } @@ -1770,16 +1770,16 @@ func (proxier *Proxier) writeIptablesRules() { "-A", string(kubeServicesChain), "-m", "addrtype", "--dst-type", "LOCAL", ) - writeLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...) + utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...) // mark drop for KUBE-LOAD-BALANCER - writeLine(proxier.natRules, []string{ + utilproxy.WriteLine(proxier.natRules, []string{ "-A", string(KubeLoadBalancerChain), "-j", string(KubeMarkMasqChain), }...) // mark drop for KUBE-FIRE-WALL - writeLine(proxier.natRules, []string{ + utilproxy.WriteLine(proxier.natRules, []string{ "-A", string(KubeFireWallChain), "-j", string(KubeMarkDropChain), }...) @@ -1792,7 +1792,7 @@ func (proxier *Proxier) writeIptablesRules() { // If the masqueradeMark has been added then we want to forward that same // traffic, this allows NodePort traffic to be forwarded even if the default // FORWARD policy is not accept. - writeLine(proxier.filterRules, + utilproxy.WriteLine(proxier.filterRules, "-A", string(KubeForwardChain), "-m", "comment", "--comment", `"kubernetes forwarding rules"`, "-m", "mark", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), @@ -1802,14 +1802,14 @@ func (proxier *Proxier) writeIptablesRules() { // The following two rules ensure the traffic after the initial packet // accepted by the "kubernetes forwarding rules" rule above will be // accepted. - writeLine(proxier.filterRules, + utilproxy.WriteLine(proxier.filterRules, "-A", string(KubeForwardChain), "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", ) - writeLine(proxier.filterRules, + utilproxy.WriteLine(proxier.filterRules, "-A", string(KubeForwardChain), "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`, "-m", "conntrack", @@ -1821,13 +1821,13 @@ func (proxier *Proxier) writeIptablesRules() { // this so that it is easier to flush and change, for example if the mark // value should ever change. // NB: THIS MUST MATCH the corresponding code in the kubelet - writeLine(proxier.natRules, []string{ + utilproxy.WriteLine(proxier.natRules, []string{ "-A", string(kubePostroutingChain), "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), "-j", "RETURN", }...) // Clear the mark to avoid re-masquerading if the packet re-traverses the network stack. - writeLine(proxier.natRules, []string{ + utilproxy.WriteLine(proxier.natRules, []string{ "-A", string(kubePostroutingChain), // XOR proxier.masqueradeMark to unset it "-j", "MARK", "--xor-mark", proxier.masqueradeMark, @@ -1840,19 +1840,19 @@ func (proxier *Proxier) writeIptablesRules() { if proxier.iptables.HasRandomFully() { masqRule = append(masqRule, "--random-fully") } - writeLine(proxier.natRules, masqRule...) + utilproxy.WriteLine(proxier.natRules, masqRule...) // Install the kubernetes-specific masquerade mark rule. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. - writeLine(proxier.natRules, []string{ + utilproxy.WriteLine(proxier.natRules, []string{ "-A", string(KubeMarkMasqChain), "-j", "MARK", "--or-mark", proxier.masqueradeMark, }...) // Write the end-of-table markers. - writeLine(proxier.filterRules, "COMMIT") - writeLine(proxier.natRules, "COMMIT") + utilproxy.WriteLine(proxier.filterRules, "COMMIT") + utilproxy.WriteLine(proxier.natRules, "COMMIT") } func (proxier *Proxier) acceptIPVSTraffic() { @@ -1866,7 +1866,7 @@ func (proxier *Proxier) acceptIPVSTraffic() { default: matchType = "dst,dst" } - writeLine(proxier.natRules, []string{ + utilproxy.WriteLine(proxier.natRules, []string{ "-A", string(kubeServicesChain), "-m", "set", "--match-set", proxier.ipsetList[set].Name, matchType, "-j", "ACCEPT", @@ -1896,15 +1896,15 @@ func (proxier *Proxier) createAndLinkKubeChain() { } if ch.table == utiliptables.TableNAT { if chain, ok := existingNATChains[ch.chain]; ok { - writeBytesLine(proxier.natChains, chain) + utilproxy.WriteBytesLine(proxier.natChains, chain) } else { - writeLine(proxier.natChains, utiliptables.MakeChainLine(ch.chain)) + utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(ch.chain)) } } else { if chain, ok := existingFilterChains[ch.chain]; ok { - writeBytesLine(proxier.filterChains, chain) + utilproxy.WriteBytesLine(proxier.filterChains, chain) } else { - writeLine(proxier.filterChains, utiliptables.MakeChainLine(ch.chain)) + utilproxy.WriteLine(proxier.filterChains, utiliptables.MakeChainLine(ch.chain)) } } } @@ -2173,24 +2173,6 @@ func (proxier *Proxier) getLegacyBindAddr(activeBindAddrs map[string]bool, curre return legacyAddrs } -// Join all words with spaces, terminate with newline and write to buff. -func writeLine(buf *bytes.Buffer, words ...string) { - // We avoid strings.Join for performance reasons. - for i := range words { - buf.WriteString(words[i]) - if i < len(words)-1 { - buf.WriteByte(' ') - } else { - buf.WriteByte('\n') - } - } -} - -func writeBytesLine(buf *bytes.Buffer, bytes []byte) { - buf.Write(bytes) - buf.WriteByte('\n') -} - // listenPortOpener opens ports by calling bind() and listen(). type listenPortOpener struct{} diff --git a/pkg/proxy/util/utils.go b/pkg/proxy/util/utils.go index 56752b3722b..98655939bee 100644 --- a/pkg/proxy/util/utils.go +++ b/pkg/proxy/util/utils.go @@ -17,6 +17,7 @@ limitations under the License. package util import ( + "bytes" "context" "errors" "fmt" @@ -441,3 +442,22 @@ func GetClusterIPByFamily(ipFamily v1.IPFamily, service *v1.Service) string { return "" } + +// WriteLine join all words with spaces, terminate with newline and write to buff. +func WriteLine(buf *bytes.Buffer, words ...string) { + // We avoid strings.Join for performance reasons. + for i := range words { + buf.WriteString(words[i]) + if i < len(words)-1 { + buf.WriteByte(' ') + } else { + buf.WriteByte('\n') + } + } +} + +// WriteBytesLine write bytes to buffer, terminate with newline +func WriteBytesLine(buf *bytes.Buffer, bytes []byte) { + buf.Write(bytes) + buf.WriteByte('\n') +} From 761473cd447791f343032bbc188915b24af265f7 Mon Sep 17 00:00:00 2001 From: jornshen Date: Tue, 19 Jan 2021 21:11:48 +0800 Subject: [PATCH 2/2] add ut for utils WriteLine WriteBytesLine --- pkg/proxy/util/utils_test.go | 66 ++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/pkg/proxy/util/utils_test.go b/pkg/proxy/util/utils_test.go index fe3d6680fc1..aa1e6e18aba 100644 --- a/pkg/proxy/util/utils_test.go +++ b/pkg/proxy/util/utils_test.go @@ -17,10 +17,12 @@ limitations under the License. package util import ( + "bytes" "context" "fmt" "net" "reflect" + "strings" "testing" v1 "k8s.io/api/core/v1" @@ -929,3 +931,67 @@ func TestGetClusterIPByFamily(t *testing.T) { } } + +func TestWriteLine(t *testing.T) { + testCases := []struct { + name string + words []string + expected string + }{ + { + name: "write no word", + words: []string{}, + expected: "", + }, + { + name: "write one word", + words: []string{"test1"}, + expected: "test1\n", + }, + { + name: "write multi word", + words: []string{"test1", "test2", "test3"}, + expected: "test1 test2 test3\n", + }, + } + testBuffer := bytes.NewBuffer(nil) + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + testBuffer.Reset() + WriteLine(testBuffer, testCase.words...) + if !strings.EqualFold(testBuffer.String(), testCase.expected) { + t.Fatalf("write word is %v\n expected: %s, got: %s", testCase.words, testCase.expected, testBuffer.String()) + } + }) + } +} + +func TestWriteBytesLine(t *testing.T) { + testCases := []struct { + name string + bytes []byte + expected string + }{ + { + name: "empty bytes", + bytes: []byte{}, + expected: "\n", + }, + { + name: "test bytes", + bytes: []byte("test write bytes line"), + expected: "test write bytes line\n", + }, + } + + testBuffer := bytes.NewBuffer(nil) + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + testBuffer.Reset() + WriteBytesLine(testBuffer, testCase.bytes) + if !strings.EqualFold(testBuffer.String(), testCase.expected) { + t.Fatalf("write word is %v\n expected: %s, got: %s", testCase.bytes, testCase.expected, testBuffer.String()) + } + }) + } +}