From f558554ce0350ca85f99d316e59f8149043e7c7a Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Wed, 3 Nov 2021 23:55:11 -0700 Subject: [PATCH 1/2] kube-proxy: minor cleanup Get rid of overlapping helper functions. --- pkg/proxy/iptables/proxier.go | 53 ++++++++++++++++++++++++----------- pkg/proxy/util/utils.go | 12 -------- pkg/proxy/util/utils_test.go | 38 ------------------------- 3 files changed, 36 insertions(+), 67 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 15323340c8e..b24aa1fd628 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -760,6 +760,10 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE } } +func prepend(sl []string, args ...string) []string { + return append(args, sl...) +} + const endpointChainsNumberThreshold = 1000 // Assumes proxier.mu is held. @@ -914,17 +918,16 @@ func (proxier *Proxier) syncProxyRules() { // this so that it is easier to flush and change, for example if the mark // value should ever change. // NB: THIS MUST MATCH the corresponding code in the kubelet - utilproxy.WriteLine(proxier.natRules, []string{ + utilproxy.WriteLine(proxier.natRules, "-A", string(kubePostroutingChain), "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), "-j", "RETURN", - }...) + ) // Clear the mark to avoid re-masquerading if the packet re-traverses the network stack. - utilproxy.WriteLine(proxier.natRules, []string{ + utilproxy.WriteLine(proxier.natRules, "-A", string(kubePostroutingChain), - // XOR proxier.masqueradeMark to unset it "-j", "MARK", "--xor-mark", proxier.masqueradeMark, - }...) + ) masqRule := []string{ "-A", string(kubePostroutingChain), "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, @@ -938,10 +941,10 @@ func (proxier *Proxier) syncProxyRules() { // Install the kubernetes-specific masquerade mark rule. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. - utilproxy.WriteLine(proxier.natRules, []string{ + utilproxy.WriteLine(proxier.natRules, "-A", string(KubeMarkMasqChain), "-j", "MARK", "--or-mark", proxier.masqueradeMark, - }...) + ) // Accumulate NAT chains to keep. activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set @@ -1107,16 +1110,22 @@ func (proxier *Proxier) syncProxyRules() { "--dport", strconv.Itoa(svcInfo.Port()), ) if proxier.masqueradeAll { - utilproxy.WriteRuleLine(proxier.natRules, string(svcChain), append(args, "-j", string(KubeMarkMasqChain))...) + args := prepend(args, "-A", string(svcChain)) + args = append(args, "-j", string(KubeMarkMasqChain)) + utilproxy.WriteLine(proxier.natRules, args...) } else if proxier.localDetector.IsImplemented() { // This masquerades off-cluster traffic to a service VIP. The idea // is that you can establish a static route for your Service range, // routing to any node, and that node will bridge into the Service // for you. Since that might bounce off-node, we masquerade here. // If/when we support "Local" policy for VIPs, we should update this. - utilproxy.WriteRuleLine(proxier.natRules, string(svcChain), proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...) + args := prepend(args, "-A", string(svcChain)) + args = proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain)) + utilproxy.WriteLine(proxier.natRules, args...) } - utilproxy.WriteRuleLine(proxier.natRules, string(kubeServicesChain), append(args, "-j", string(svcChain))...) + args = prepend(args, "-A", string(kubeServicesChain)) + args = append(args, "-j", string(svcChain)) + utilproxy.WriteLine(proxier.natRules, args...) } else { // No endpoints. utilproxy.WriteLine(proxier.filterRules, @@ -1180,15 +1189,20 @@ func (proxier *Proxier) syncProxyRules() { // If we can't differentiate the local traffic we always SNAT. if !svcInfo.NodeLocalExternal() { destChain = svcChain + args := prepend(args, "-A", string(svcChain)) // This masquerades off-cluster traffic to a External IP. if proxier.localDetector.IsImplemented() { - utilproxy.WriteRuleLine(proxier.natRules, string(svcChain), proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, + proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...) } else { - utilproxy.WriteRuleLine(proxier.natRules, string(svcChain), append(args, "-j", string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, + append(args, "-j", string(KubeMarkMasqChain))...) } } // Send traffic bound for external IPs to the service chain. - utilproxy.WriteRuleLine(proxier.natRules, string(kubeServicesChain), append(args, "-j", string(destChain))...) + args = prepend(args, "-A", string(kubeServicesChain)) + utilproxy.WriteLine(proxier.natRules, + append(args, "-j", string(destChain))...) } else { // No endpoints. @@ -1345,9 +1359,11 @@ func (proxier *Proxier) syncProxyRules() { ) if !svcInfo.NodeLocalExternal() { // Nodeports need SNAT, unless they're local. - utilproxy.WriteRuleLine(proxier.natRules, string(svcChain), append(args, "-j", string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, + append(prepend(args, "-A", string(svcChain)), "-j", string(KubeMarkMasqChain))...) // Jump to the service chain. - utilproxy.WriteRuleLine(proxier.natRules, string(kubeNodePortsChain), append(args, "-j", string(svcChain))...) + utilproxy.WriteLine(proxier.natRules, + append(prepend(args, "-A", string(kubeNodePortsChain)), "-j", string(svcChain))...) } else { // TODO: Make all nodePorts jump to the firewall chain. // Currently we only create it for loadbalancers (#33586). @@ -1357,8 +1373,11 @@ func (proxier *Proxier) syncProxyRules() { if isIPv6 { loopback = "::1/128" } - utilproxy.WriteRuleLine(proxier.natRules, string(kubeNodePortsChain), append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...) - utilproxy.WriteRuleLine(proxier.natRules, string(kubeNodePortsChain), append(args, "-j", string(svcXlbChain))...) + args = prepend(args, "-A", string(kubeNodePortsChain)) + utilproxy.WriteLine(proxier.natRules, + append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...) + utilproxy.WriteLine(proxier.natRules, + append(args, "-j", string(svcXlbChain))...) } } else { // No endpoints. diff --git a/pkg/proxy/util/utils.go b/pkg/proxy/util/utils.go index aaabc82c020..a3b7c0b675f 100644 --- a/pkg/proxy/util/utils.go +++ b/pkg/proxy/util/utils.go @@ -481,18 +481,6 @@ func WriteLine(buf *bytes.Buffer, words ...string) { } } -// WriteRuleLine prepends the strings "-A" and chainName to the buffer and calls -// WriteLine to join all the words into the buffer and terminate with newline. -func WriteRuleLine(buf *bytes.Buffer, chainName string, words ...string) { - if len(words) == 0 { - return - } - buf.WriteString("-A ") - buf.WriteString(chainName) - buf.WriteByte(' ') - WriteLine(buf, words...) -} - // WriteBytesLine write bytes to buffer, terminate with newline func WriteBytesLine(buf *bytes.Buffer, bytes []byte) { buf.Write(bytes) diff --git a/pkg/proxy/util/utils_test.go b/pkg/proxy/util/utils_test.go index dff2dbb97be..cab177150d7 100644 --- a/pkg/proxy/util/utils_test.go +++ b/pkg/proxy/util/utils_test.go @@ -1183,44 +1183,6 @@ func TestWriteLine(t *testing.T) { } } -func TestWriteRuleLine(t *testing.T) { - testCases := []struct { - name string - chainName string - words []string - expected string - }{ - { - name: "write no line due to no words", - chainName: "KUBE-SVC-FOO", - words: []string{}, - expected: "", - }, - { - name: "write one line", - chainName: "KUBE-XLB-FOO", - words: []string{"test1"}, - expected: "-A KUBE-XLB-FOO test1\n", - }, - { - name: "write multi word line", - chainName: "lolChain", - words: []string{"test1", "test2", "test3"}, - expected: "-A lolChain test1 test2 test3\n", - }, - } - testBuffer := bytes.NewBuffer(nil) - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - testBuffer.Reset() - WriteRuleLine(testBuffer, testCase.chainName, testCase.words...) - if !strings.EqualFold(testBuffer.String(), testCase.expected) { - t.Fatalf("write word is %v\n expected: %s, got: %s", testCase.words, testCase.expected, testBuffer.String()) - } - }) - } -} - func TestWriteBytesLine(t *testing.T) { testCases := []struct { name string From f662170ff72a1d6635ad66cb9dcc659693e520c3 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Thu, 4 Nov 2021 00:31:19 -0700 Subject: [PATCH 2/2] kube-proxy: make iptables buffer-writing cleaner --- pkg/proxy/iptables/proxier.go | 162 ++++++++++++++--------------- pkg/proxy/iptables/proxier_test.go | 8 +- pkg/proxy/ipvs/proxier.go | 70 ++++++------- pkg/proxy/ipvs/proxier_test.go | 13 +-- pkg/proxy/util/utils.go | 30 ++++-- pkg/proxy/util/utils_test.go | 21 ++-- 6 files changed, 158 insertions(+), 146 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index b24aa1fd628..9bcda7b077c 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -223,10 +223,10 @@ type Proxier struct { // that are significantly impacting performance. iptablesData *bytes.Buffer existingFilterChainsData *bytes.Buffer - filterChains *bytes.Buffer - filterRules *bytes.Buffer - natChains *bytes.Buffer - natRules *bytes.Buffer + filterChains utilproxy.LineBuffer + filterRules utilproxy.LineBuffer + natChains utilproxy.LineBuffer + natRules utilproxy.LineBuffer // endpointChainsNumber is the total amount of endpointChains across all // services that we will generate (it is computed at the beginning of @@ -315,10 +315,10 @@ func NewProxier(ipt utiliptables.Interface, precomputedProbabilities: make([]string, 0, 1001), iptablesData: bytes.NewBuffer(nil), existingFilterChainsData: bytes.NewBuffer(nil), - filterChains: bytes.NewBuffer(nil), - filterRules: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), + filterChains: utilproxy.LineBuffer{}, + filterRules: utilproxy.LineBuffer{}, + natChains: utilproxy.LineBuffer{}, + natRules: utilproxy.LineBuffer{}, nodePortAddresses: nodePortAddresses, networkInterfacer: utilproxy.RealNetwork{}, } @@ -432,26 +432,26 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { encounteredError = true } else { existingNATChains := utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes()) - natChains := bytes.NewBuffer(nil) - natRules := bytes.NewBuffer(nil) - utilproxy.WriteLine(natChains, "*nat") + natChains := &utilproxy.LineBuffer{} + natRules := &utilproxy.LineBuffer{} + natChains.Write("*nat") // Start with chains we know we need to remove. for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain} { if _, found := existingNATChains[chain]; found { chainString := string(chain) - utilproxy.WriteBytesLine(natChains, existingNATChains[chain]) // flush - utilproxy.WriteLine(natRules, "-X", chainString) // delete + natChains.WriteBytes(existingNATChains[chain]) // flush + natRules.Write("-X", chainString) // delete } } // Hunt for service and endpoint chains. for chain := range existingNATChains { chainString := string(chain) if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") { - utilproxy.WriteBytesLine(natChains, existingNATChains[chain]) // flush - utilproxy.WriteLine(natRules, "-X", chainString) // delete + natChains.WriteBytes(existingNATChains[chain]) // flush + natRules.Write("-X", chainString) // delete } } - utilproxy.WriteLine(natRules, "COMMIT") + natRules.Write("COMMIT") natLines := append(natChains.Bytes(), natRules.Bytes()...) // Write it. err = ipt.Restore(utiliptables.TableNAT, natLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) @@ -469,17 +469,17 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { encounteredError = true } else { existingFilterChains := utiliptables.GetChainLines(utiliptables.TableFilter, iptablesData.Bytes()) - filterChains := bytes.NewBuffer(nil) - filterRules := bytes.NewBuffer(nil) - utilproxy.WriteLine(filterChains, "*filter") + filterChains := &utilproxy.LineBuffer{} + filterRules := &utilproxy.LineBuffer{} + filterChains.Write("*filter") for _, chain := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} { if _, found := existingFilterChains[chain]; found { chainString := string(chain) - utilproxy.WriteBytesLine(filterChains, existingFilterChains[chain]) - utilproxy.WriteLine(filterRules, "-X", chainString) + filterChains.WriteBytes(existingFilterChains[chain]) + filterRules.Write("-X", chainString) } } - utilproxy.WriteLine(filterRules, "COMMIT") + filterRules.Write("COMMIT") filterLines := append(filterChains.Bytes(), filterRules.Bytes()...) // Write it. if err := ipt.Restore(utiliptables.TableFilter, filterLines, utiliptables.NoFlushTables, utiliptables.RestoreCounters); err != nil { @@ -894,23 +894,23 @@ func (proxier *Proxier) syncProxyRules() { proxier.natRules.Reset() // Write table headers. - utilproxy.WriteLine(proxier.filterChains, "*filter") - utilproxy.WriteLine(proxier.natChains, "*nat") + proxier.filterChains.Write("*filter") + proxier.natChains.Write("*nat") // Make sure we keep stats for the top-level chains, if they existed // (which most should have because we created them above). for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeExternalServicesChain, kubeForwardChain, kubeNodePortsChain} { if chain, ok := existingFilterChains[chainName]; ok { - utilproxy.WriteBytesLine(proxier.filterChains, chain) + proxier.filterChains.WriteBytes(chain) } else { - utilproxy.WriteLine(proxier.filterChains, utiliptables.MakeChainLine(chainName)) + proxier.filterChains.Write(utiliptables.MakeChainLine(chainName)) } } for _, chainName := range []utiliptables.Chain{kubeServicesChain, kubeNodePortsChain, kubePostroutingChain, KubeMarkMasqChain} { if chain, ok := existingNATChains[chainName]; ok { - utilproxy.WriteBytesLine(proxier.natChains, chain) + proxier.natChains.WriteBytes(chain) } else { - utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(chainName)) + proxier.natChains.Write(utiliptables.MakeChainLine(chainName)) } } @@ -918,13 +918,13 @@ func (proxier *Proxier) syncProxyRules() { // this so that it is easier to flush and change, for example if the mark // value should ever change. // NB: THIS MUST MATCH the corresponding code in the kubelet - utilproxy.WriteLine(proxier.natRules, + proxier.natRules.Write( "-A", string(kubePostroutingChain), "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), "-j", "RETURN", ) // Clear the mark to avoid re-masquerading if the packet re-traverses the network stack. - utilproxy.WriteLine(proxier.natRules, + proxier.natRules.Write( "-A", string(kubePostroutingChain), "-j", "MARK", "--xor-mark", proxier.masqueradeMark, ) @@ -936,12 +936,12 @@ func (proxier *Proxier) syncProxyRules() { if proxier.iptables.HasRandomFully() { masqRule = append(masqRule, "--random-fully") } - utilproxy.WriteLine(proxier.natRules, masqRule...) + proxier.natRules.Write(masqRule...) // Install the kubernetes-specific masquerade mark rule. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. - utilproxy.WriteLine(proxier.natRules, + proxier.natRules.Write( "-A", string(KubeMarkMasqChain), "-j", "MARK", "--or-mark", proxier.masqueradeMark, ) @@ -1057,16 +1057,16 @@ func (proxier *Proxier) syncProxyRules() { // Create the endpoint chain, retaining counters if possible. if chain, ok := existingNATChains[endpointChain]; ok { - utilproxy.WriteBytesLine(proxier.natChains, chain) + proxier.natChains.WriteBytes(chain) } else { - utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain)) + proxier.natChains.Write(utiliptables.MakeChainLine(endpointChain)) } activeNATChains[endpointChain] = true args = append(args[:0], "-A", string(endpointChain)) args = proxier.appendServiceCommentLocked(args, svcNameString) // Handle traffic that loops back to the originator with SNAT. - utilproxy.WriteLine(proxier.natRules, append(args, + proxier.natRules.Write(append(args, "-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(epInfo.IP())), "-j", string(KubeMarkMasqChain))...) // Update client-affinity lists. @@ -1075,16 +1075,16 @@ func (proxier *Proxier) syncProxyRules() { } // DNAT to final destination. args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.Endpoint) - utilproxy.WriteLine(proxier.natRules, args...) + proxier.natRules.Write(args...) } svcChain := svcInfo.servicePortChainName if hasEndpoints { // Create the per-service chain, retaining counters if possible. if chain, ok := existingNATChains[svcChain]; ok { - utilproxy.WriteBytesLine(proxier.natChains, chain) + proxier.natChains.WriteBytes(chain) } else { - utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(svcChain)) + proxier.natChains.Write(utiliptables.MakeChainLine(svcChain)) } activeNATChains[svcChain] = true } @@ -1094,9 +1094,9 @@ func (proxier *Proxier) syncProxyRules() { // Only for services request OnlyLocal traffic // create the per-service LB chain, retaining counters if possible. if lbChain, ok := existingNATChains[svcXlbChain]; ok { - utilproxy.WriteBytesLine(proxier.natChains, lbChain) + proxier.natChains.WriteBytes(lbChain) } else { - utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain)) + proxier.natChains.Write(utiliptables.MakeChainLine(svcXlbChain)) } activeNATChains[svcXlbChain] = true } @@ -1112,7 +1112,7 @@ func (proxier *Proxier) syncProxyRules() { if proxier.masqueradeAll { args := prepend(args, "-A", string(svcChain)) args = append(args, "-j", string(KubeMarkMasqChain)) - utilproxy.WriteLine(proxier.natRules, args...) + proxier.natRules.Write(args...) } else if proxier.localDetector.IsImplemented() { // This masquerades off-cluster traffic to a service VIP. The idea // is that you can establish a static route for your Service range, @@ -1121,14 +1121,14 @@ func (proxier *Proxier) syncProxyRules() { // If/when we support "Local" policy for VIPs, we should update this. args := prepend(args, "-A", string(svcChain)) args = proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain)) - utilproxy.WriteLine(proxier.natRules, args...) + proxier.natRules.Write(args...) } args = prepend(args, "-A", string(kubeServicesChain)) args = append(args, "-j", string(svcChain)) - utilproxy.WriteLine(proxier.natRules, args...) + proxier.natRules.Write(args...) } else { // No endpoints. - utilproxy.WriteLine(proxier.filterRules, + proxier.filterRules.Write( "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, @@ -1192,21 +1192,21 @@ func (proxier *Proxier) syncProxyRules() { args := prepend(args, "-A", string(svcChain)) // This masquerades off-cluster traffic to a External IP. if proxier.localDetector.IsImplemented() { - utilproxy.WriteLine(proxier.natRules, + proxier.natRules.Write( proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...) } else { - utilproxy.WriteLine(proxier.natRules, + proxier.natRules.Write( append(args, "-j", string(KubeMarkMasqChain))...) } } // Send traffic bound for external IPs to the service chain. args = prepend(args, "-A", string(kubeServicesChain)) - utilproxy.WriteLine(proxier.natRules, + proxier.natRules.Write( append(args, "-j", string(destChain))...) } else { // No endpoints. - utilproxy.WriteLine(proxier.filterRules, + proxier.filterRules.Write( "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, @@ -1224,9 +1224,9 @@ func (proxier *Proxier) syncProxyRules() { if hasEndpoints { // create service firewall chain if chain, ok := existingNATChains[fwChain]; ok { - utilproxy.WriteBytesLine(proxier.natChains, chain) + proxier.natChains.WriteBytes(chain) } else { - utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(fwChain)) + proxier.natChains.Write(utiliptables.MakeChainLine(fwChain)) } activeNATChains[fwChain] = true // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. @@ -1241,7 +1241,7 @@ func (proxier *Proxier) syncProxyRules() { "--dport", strconv.Itoa(svcInfo.Port()), ) // jump to service firewall chain - utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(fwChain))...) + proxier.natRules.Write(append(args, "-j", string(fwChain))...) args = append(args[:0], "-A", string(fwChain), @@ -1253,18 +1253,18 @@ func (proxier *Proxier) syncProxyRules() { // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. if !svcInfo.NodeLocalExternal() { - utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + proxier.natRules.Write(append(args, "-j", string(KubeMarkMasqChain))...) chosenChain = svcChain } if len(svcInfo.LoadBalancerSourceRanges()) == 0 { // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain - utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(chosenChain))...) + proxier.natRules.Write(append(args, "-j", string(chosenChain))...) } else { // firewall filter based on each source range allowFromNode := false for _, src := range svcInfo.LoadBalancerSourceRanges() { - utilproxy.WriteLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...) + proxier.natRules.Write(append(args, "-s", src, "-j", string(chosenChain))...) _, cidr, err := netutils.ParseCIDRSloppy(src) if err != nil { klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr) @@ -1276,16 +1276,16 @@ func (proxier *Proxier) syncProxyRules() { // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // Need to add the following rule to allow request on host. if allowFromNode { - utilproxy.WriteLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(ingress)), "-j", string(chosenChain))...) + proxier.natRules.Write(append(args, "-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(ingress)), "-j", string(chosenChain))...) } } // If the packet was able to reach the end of firewall chain, then it did not get DNATed. // It means the packet cannot go thru the firewall, then mark it for DROP - utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) + proxier.natRules.Write(append(args, "-j", string(KubeMarkDropChain))...) } else { // No endpoints. - utilproxy.WriteLine(proxier.filterRules, + proxier.filterRules.Write( "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, @@ -1359,10 +1359,10 @@ func (proxier *Proxier) syncProxyRules() { ) if !svcInfo.NodeLocalExternal() { // Nodeports need SNAT, unless they're local. - utilproxy.WriteLine(proxier.natRules, + proxier.natRules.Write( append(prepend(args, "-A", string(svcChain)), "-j", string(KubeMarkMasqChain))...) // Jump to the service chain. - utilproxy.WriteLine(proxier.natRules, + proxier.natRules.Write( append(prepend(args, "-A", string(kubeNodePortsChain)), "-j", string(svcChain))...) } else { // TODO: Make all nodePorts jump to the firewall chain. @@ -1374,14 +1374,14 @@ func (proxier *Proxier) syncProxyRules() { loopback = "::1/128" } args = prepend(args, "-A", string(kubeNodePortsChain)) - utilproxy.WriteLine(proxier.natRules, + proxier.natRules.Write( append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...) - utilproxy.WriteLine(proxier.natRules, + proxier.natRules.Write( append(args, "-j", string(svcXlbChain))...) } } else { // No endpoints. - utilproxy.WriteLine(proxier.filterRules, + proxier.filterRules.Write( "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "addrtype", "--dst-type", "LOCAL", @@ -1396,7 +1396,7 @@ func (proxier *Proxier) syncProxyRules() { if svcInfo.HealthCheckNodePort() != 0 { // no matter if node has local endpoints, healthCheckNodePorts // need to add a rule to accept the incoming connection - utilproxy.WriteLine(proxier.filterRules, + proxier.filterRules.Write( "-A", string(kubeNodePortsChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s health check node port"`, svcNameString), "-m", "tcp", "-p", "tcp", @@ -1421,7 +1421,7 @@ func (proxier *Proxier) syncProxyRules() { "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap", "-j", string(endpointChain), ) - utilproxy.WriteLine(proxier.natRules, args...) + proxier.natRules.Write(args...) } } @@ -1440,7 +1440,7 @@ func (proxier *Proxier) syncProxyRules() { } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) - utilproxy.WriteLine(proxier.natRules, args...) + proxier.natRules.Write(args...) } // The logic below this applies only if this service is marked as OnlyLocal @@ -1457,17 +1457,17 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`, ) - utilproxy.WriteLine(proxier.natRules, proxier.localDetector.JumpIfLocal(args, string(svcChain))...) + proxier.natRules.Write(proxier.localDetector.JumpIfLocal(args, string(svcChain))...) } // Next, redirect all src-type=LOCAL -> LB IP to the service chain for externalTrafficPolicy=Local // This allows traffic originating from the host to be redirected to the service correctly, // otherwise traffic to LB IPs are dropped if there are no local endpoints. args = append(args[:0], "-A", string(svcXlbChain)) - utilproxy.WriteLine(proxier.natRules, append(args, + proxier.natRules.Write(append(args, "-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s LB IP"`, svcNameString), "-m", "addrtype", "--src-type", "LOCAL", "-j", string(KubeMarkMasqChain))...) - utilproxy.WriteLine(proxier.natRules, append(args, + proxier.natRules.Write(append(args, "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString), "-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...) @@ -1481,12 +1481,12 @@ func (proxier *Proxier) syncProxyRules() { "-j", string(KubeMarkDropChain), ) - utilproxy.WriteLine(proxier.natRules, args...) + proxier.natRules.Write(args...) } else { // First write session affinity rules only over local endpoints, if applicable. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { for _, endpointChain := range localEndpointChains { - utilproxy.WriteLine(proxier.natRules, + proxier.natRules.Write( "-A", string(svcXlbChain), "-m", "comment", "--comment", svcNameString, "-m", "recent", "--name", string(endpointChain), @@ -1512,7 +1512,7 @@ func (proxier *Proxier) syncProxyRules() { } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) - utilproxy.WriteLine(proxier.natRules, args...) + proxier.natRules.Write(args...) } } } @@ -1528,8 +1528,8 @@ func (proxier *Proxier) syncProxyRules() { // We must (as per iptables) write a chain-line for it, which has // the nice effect of flushing the chain. Then we can remove the // chain. - utilproxy.WriteBytesLine(proxier.natChains, existingNATChains[chain]) - utilproxy.WriteLine(proxier.natRules, "-X", chainString) + proxier.natChains.WriteBytes(existingNATChains[chain]) + proxier.natRules.Write("-X", chainString) } } @@ -1544,7 +1544,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, "-m", "addrtype", "--dst-type", "LOCAL", "-j", string(kubeNodePortsChain)) - utilproxy.WriteLine(proxier.natRules, args...) + proxier.natRules.Write(args...) // Nothing else matters after the zero CIDR. break } @@ -1559,13 +1559,13 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, "-d", address, "-j", string(kubeNodePortsChain)) - utilproxy.WriteLine(proxier.natRules, args...) + proxier.natRules.Write(args...) } // Drop the packets in INVALID state, which would potentially cause // unexpected connection reset. // https://github.com/kubernetes/kubernetes/issues/74839 - utilproxy.WriteLine(proxier.filterRules, + proxier.filterRules.Write( "-A", string(kubeForwardChain), "-m", "conntrack", "--ctstate", "INVALID", @@ -1575,7 +1575,7 @@ func (proxier *Proxier) syncProxyRules() { // If the masqueradeMark has been added then we want to forward that same // traffic, this allows NodePort traffic to be forwarded even if the default // FORWARD policy is not accept. - utilproxy.WriteLine(proxier.filterRules, + proxier.filterRules.Write( "-A", string(kubeForwardChain), "-m", "comment", "--comment", `"kubernetes forwarding rules"`, "-m", "mark", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), @@ -1585,14 +1585,14 @@ func (proxier *Proxier) syncProxyRules() { // The following two rules ensure the traffic after the initial packet // accepted by the "kubernetes forwarding rules" rule above will be // accepted. - utilproxy.WriteLine(proxier.filterRules, + proxier.filterRules.Write( "-A", string(kubeForwardChain), "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", ) - utilproxy.WriteLine(proxier.filterRules, + proxier.filterRules.Write( "-A", string(kubeForwardChain), "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`, "-m", "conntrack", @@ -1606,8 +1606,8 @@ func (proxier *Proxier) syncProxyRules() { metrics.IptablesRulesTotal.WithLabelValues(string(utiliptables.TableNAT)).Set(float64(numberNatIptablesRules)) // Write the end-of-table markers. - utilproxy.WriteLine(proxier.filterRules, "COMMIT") - utilproxy.WriteLine(proxier.natRules, "COMMIT") + proxier.filterRules.Write("COMMIT") + proxier.natRules.Write("COMMIT") // Sync rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 66eb9bcf92d..f0766c17a87 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -509,10 +509,10 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { precomputedProbabilities: make([]string, 0, 1001), iptablesData: bytes.NewBuffer(nil), existingFilterChainsData: bytes.NewBuffer(nil), - filterChains: bytes.NewBuffer(nil), - filterRules: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), + filterChains: utilproxy.LineBuffer{}, + filterRules: utilproxy.LineBuffer{}, + natChains: utilproxy.LineBuffer{}, + natRules: utilproxy.LineBuffer{}, nodePortAddresses: make([]string, 0), networkInterfacer: utilproxytest.NewFakeNetwork(), } diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index f887bb87867..b30bad99a69 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -261,10 +261,10 @@ type Proxier struct { // that are significantly impacting performance. iptablesData *bytes.Buffer filterChainsData *bytes.Buffer - natChains *bytes.Buffer - filterChains *bytes.Buffer - natRules *bytes.Buffer - filterRules *bytes.Buffer + natChains utilproxy.LineBuffer + filterChains utilproxy.LineBuffer + natRules utilproxy.LineBuffer + filterRules utilproxy.LineBuffer // Added as a member to the struct to allow injection for testing. netlinkHandle NetLinkHandle // ipsetList is the list of ipsets that ipvs proxier used. @@ -479,10 +479,10 @@ func NewProxier(ipt utiliptables.Interface, ipGetter: &realIPGetter{nl: NewNetLinkHandle(ipFamily == v1.IPv6Protocol)}, iptablesData: bytes.NewBuffer(nil), filterChainsData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - filterChains: bytes.NewBuffer(nil), - filterRules: bytes.NewBuffer(nil), + natChains: utilproxy.LineBuffer{}, + natRules: utilproxy.LineBuffer{}, + filterChains: utilproxy.LineBuffer{}, + filterRules: utilproxy.LineBuffer{}, netlinkHandle: NewNetLinkHandle(ipFamily == v1.IPv6Protocol), ipset: ipset, nodePortAddresses: nodePortAddresses, @@ -1043,8 +1043,8 @@ func (proxier *Proxier) syncProxyRules() { proxier.filterRules.Reset() // Write table headers. - utilproxy.WriteLine(proxier.filterChains, "*filter") - utilproxy.WriteLine(proxier.natChains, "*nat") + proxier.filterChains.Write("*filter") + proxier.natChains.Write("*nat") proxier.createAndLinkKubeChain() @@ -1704,7 +1704,7 @@ func (proxier *Proxier) writeIptablesRules() { "-m", "set", "--match-set", proxier.ipsetList[set.name].Name, set.matchType, ) - utilproxy.WriteLine(proxier.natRules, append(args, "-j", set.to)...) + proxier.natRules.Write(append(args, "-j", set.to)...) } } @@ -1715,14 +1715,14 @@ func (proxier *Proxier) writeIptablesRules() { "-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name, ) if proxier.masqueradeAll { - utilproxy.WriteLine(proxier.natRules, append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...) + proxier.natRules.Write(append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...) } else if proxier.localDetector.IsImplemented() { // This masquerades off-cluster traffic to a service VIP. The idea // is that you can establish a static route for your Service range, // routing to any node, and that node will bridge into the Service // for you. Since that might bounce off-node, we masquerade here. // If/when we support "Local" policy for VIPs, we should update this. - utilproxy.WriteLine(proxier.natRules, proxier.localDetector.JumpIfNotLocal(append(args, "dst,dst"), string(KubeMarkMasqChain))...) + proxier.natRules.Write(proxier.localDetector.JumpIfNotLocal(append(args, "dst,dst"), string(KubeMarkMasqChain))...) } else { // Masquerade all OUTPUT traffic coming from a service ip. // The kube dummy interface has all service VIPs assigned which @@ -1731,7 +1731,7 @@ func (proxier *Proxier) writeIptablesRules() { // VIP:. // Always masquerading OUTPUT (node-originating) traffic with a VIP // source ip and service port destination fixes the outgoing connections. - utilproxy.WriteLine(proxier.natRules, append(args, "src,dst", "-j", string(KubeMarkMasqChain))...) + proxier.natRules.Write(append(args, "src,dst", "-j", string(KubeMarkMasqChain))...) } } @@ -1744,11 +1744,11 @@ func (proxier *Proxier) writeIptablesRules() { externalTrafficOnlyArgs := append(args, "-m", "physdev", "!", "--physdev-is-in", "-m", "addrtype", "!", "--src-type", "LOCAL") - utilproxy.WriteLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...) + proxier.natRules.Write(append(externalTrafficOnlyArgs, "-j", "ACCEPT")...) dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. // This covers cases like GCE load-balancers which get added to the local routing table. - utilproxy.WriteLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...) + proxier.natRules.Write(append(dstLocalOnlyArgs, "-j", "ACCEPT")...) } if !proxier.ipsetList[kubeExternalIPSet].isEmpty() { @@ -1759,7 +1759,7 @@ func (proxier *Proxier) writeIptablesRules() { "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name, "dst,dst", ) - utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + proxier.natRules.Write(append(args, "-j", string(KubeMarkMasqChain))...) externalIPRules(args) } @@ -1778,16 +1778,16 @@ func (proxier *Proxier) writeIptablesRules() { "-A", string(kubeServicesChain), "-m", "addrtype", "--dst-type", "LOCAL", ) - utilproxy.WriteLine(proxier.natRules, append(args, "-j", string(KubeNodePortChain))...) + proxier.natRules.Write(append(args, "-j", string(KubeNodePortChain))...) // mark drop for KUBE-LOAD-BALANCER - utilproxy.WriteLine(proxier.natRules, []string{ + proxier.natRules.Write([]string{ "-A", string(KubeLoadBalancerChain), "-j", string(KubeMarkMasqChain), }...) // mark drop for KUBE-FIRE-WALL - utilproxy.WriteLine(proxier.natRules, []string{ + proxier.natRules.Write([]string{ "-A", string(KubeFireWallChain), "-j", string(KubeMarkDropChain), }...) @@ -1800,7 +1800,7 @@ func (proxier *Proxier) writeIptablesRules() { // If the masqueradeMark has been added then we want to forward that same // traffic, this allows NodePort traffic to be forwarded even if the default // FORWARD policy is not accept. - utilproxy.WriteLine(proxier.filterRules, + proxier.filterRules.Write( "-A", string(KubeForwardChain), "-m", "comment", "--comment", `"kubernetes forwarding rules"`, "-m", "mark", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), @@ -1810,14 +1810,14 @@ func (proxier *Proxier) writeIptablesRules() { // The following two rules ensure the traffic after the initial packet // accepted by the "kubernetes forwarding rules" rule above will be // accepted. - utilproxy.WriteLine(proxier.filterRules, + proxier.filterRules.Write( "-A", string(KubeForwardChain), "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod source rule"`, "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "ACCEPT", ) - utilproxy.WriteLine(proxier.filterRules, + proxier.filterRules.Write( "-A", string(KubeForwardChain), "-m", "comment", "--comment", `"kubernetes forwarding conntrack pod destination rule"`, "-m", "conntrack", @@ -1826,7 +1826,7 @@ func (proxier *Proxier) writeIptablesRules() { ) // Add rule to accept traffic towards health check node port - utilproxy.WriteLine(proxier.filterRules, + proxier.filterRules.Write( "-A", string(KubeNodePortChain), "-m", "comment", "--comment", proxier.ipsetList[kubeHealthCheckNodePortSet].getComment(), "-m", "set", "--match-set", proxier.ipsetList[kubeHealthCheckNodePortSet].Name, "dst", @@ -1837,13 +1837,13 @@ func (proxier *Proxier) writeIptablesRules() { // this so that it is easier to flush and change, for example if the mark // value should ever change. // NB: THIS MUST MATCH the corresponding code in the kubelet - utilproxy.WriteLine(proxier.natRules, []string{ + proxier.natRules.Write([]string{ "-A", string(kubePostroutingChain), "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), "-j", "RETURN", }...) // Clear the mark to avoid re-masquerading if the packet re-traverses the network stack. - utilproxy.WriteLine(proxier.natRules, []string{ + proxier.natRules.Write([]string{ "-A", string(kubePostroutingChain), // XOR proxier.masqueradeMark to unset it "-j", "MARK", "--xor-mark", proxier.masqueradeMark, @@ -1856,19 +1856,19 @@ func (proxier *Proxier) writeIptablesRules() { if proxier.iptables.HasRandomFully() { masqRule = append(masqRule, "--random-fully") } - utilproxy.WriteLine(proxier.natRules, masqRule...) + proxier.natRules.Write(masqRule...) // Install the kubernetes-specific masquerade mark rule. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. - utilproxy.WriteLine(proxier.natRules, []string{ + proxier.natRules.Write([]string{ "-A", string(KubeMarkMasqChain), "-j", "MARK", "--or-mark", proxier.masqueradeMark, }...) // Write the end-of-table markers. - utilproxy.WriteLine(proxier.filterRules, "COMMIT") - utilproxy.WriteLine(proxier.natRules, "COMMIT") + proxier.filterRules.Write("COMMIT") + proxier.natRules.Write("COMMIT") } func (proxier *Proxier) acceptIPVSTraffic() { @@ -1882,7 +1882,7 @@ func (proxier *Proxier) acceptIPVSTraffic() { default: matchType = "dst,dst" } - utilproxy.WriteLine(proxier.natRules, []string{ + proxier.natRules.Write([]string{ "-A", string(kubeServicesChain), "-m", "set", "--match-set", proxier.ipsetList[set].Name, matchType, "-j", "ACCEPT", @@ -1912,15 +1912,15 @@ func (proxier *Proxier) createAndLinkKubeChain() { } if ch.table == utiliptables.TableNAT { if chain, ok := existingNATChains[ch.chain]; ok { - utilproxy.WriteBytesLine(proxier.natChains, chain) + proxier.natChains.WriteBytes(chain) } else { - utilproxy.WriteLine(proxier.natChains, utiliptables.MakeChainLine(ch.chain)) + proxier.natChains.Write(utiliptables.MakeChainLine(ch.chain)) } } else { if chain, ok := existingFilterChains[ch.chain]; ok { - utilproxy.WriteBytesLine(proxier.filterChains, chain) + proxier.filterChains.WriteBytes(chain) } else { - utilproxy.WriteLine(proxier.filterChains, utiliptables.MakeChainLine(ch.chain)) + proxier.filterChains.Write(utiliptables.MakeChainLine(ch.chain)) } } } diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index 0b83fac558c..0654d25c9c8 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" + utilproxy "k8s.io/kubernetes/pkg/proxy/util" proxyutiliptables "k8s.io/kubernetes/pkg/proxy/util/iptables" proxyutiltest "k8s.io/kubernetes/pkg/proxy/util/testing" "k8s.io/kubernetes/pkg/util/async" @@ -159,10 +160,10 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset u ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, iptablesData: bytes.NewBuffer(nil), filterChainsData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - filterChains: bytes.NewBuffer(nil), - filterRules: bytes.NewBuffer(nil), + natChains: utilproxy.LineBuffer{}, + natRules: utilproxy.LineBuffer{}, + filterChains: utilproxy.LineBuffer{}, + filterRules: utilproxy.LineBuffer{}, netlinkHandle: netlinktest.NewFakeNetlinkHandle(), ipsetList: ipsetList, nodePortAddresses: make([]string, 0), @@ -4650,8 +4651,8 @@ func TestCreateAndLinkKubeChain(t *testing.T) { expectedFilterChains := `:KUBE-FORWARD - [0:0] :KUBE-NODE-PORT - [0:0] ` - assert.Equal(t, expectedNATChains, fp.natChains.String()) - assert.Equal(t, expectedFilterChains, fp.filterChains.String()) + assert.Equal(t, expectedNATChains, string(fp.natChains.Bytes())) + assert.Equal(t, expectedFilterChains, string(fp.filterChains.Bytes())) } // This test ensures that the iptables proxier supports translating Endpoints to diff --git a/pkg/proxy/util/utils.go b/pkg/proxy/util/utils.go index a3b7c0b675f..04a5f4353ee 100644 --- a/pkg/proxy/util/utils.go +++ b/pkg/proxy/util/utils.go @@ -468,23 +468,35 @@ func GetClusterIPByFamily(ipFamily v1.IPFamily, service *v1.Service) string { return "" } -// WriteLine join all words with spaces, terminate with newline and write to buff. -func WriteLine(buf *bytes.Buffer, words ...string) { +type LineBuffer struct { + b bytes.Buffer +} + +// Write joins all words with spaces, terminates with newline and writes to buf. +func (buf *LineBuffer) Write(words ...string) { // We avoid strings.Join for performance reasons. for i := range words { - buf.WriteString(words[i]) + buf.b.WriteString(words[i]) if i < len(words)-1 { - buf.WriteByte(' ') + buf.b.WriteByte(' ') } else { - buf.WriteByte('\n') + buf.b.WriteByte('\n') } } } -// WriteBytesLine write bytes to buffer, terminate with newline -func WriteBytesLine(buf *bytes.Buffer, bytes []byte) { - buf.Write(bytes) - buf.WriteByte('\n') +// WriteBytes writes bytes to buffer, and terminates with newline. +func (buf *LineBuffer) WriteBytes(bytes []byte) { + buf.b.Write(bytes) + buf.b.WriteByte('\n') +} + +func (buf *LineBuffer) Reset() { + buf.b.Reset() +} + +func (buf *LineBuffer) Bytes() []byte { + return buf.b.Bytes() } // RevertPorts is closing ports in replacementPortsMap but not in originalPortsMap. In other words, it only diff --git a/pkg/proxy/util/utils_test.go b/pkg/proxy/util/utils_test.go index cab177150d7..391d7ea34af 100644 --- a/pkg/proxy/util/utils_test.go +++ b/pkg/proxy/util/utils_test.go @@ -17,7 +17,6 @@ limitations under the License. package util import ( - "bytes" "context" "fmt" "math/rand" @@ -1171,13 +1170,13 @@ func TestWriteLine(t *testing.T) { expected: "test1 test2 test3\n", }, } - testBuffer := bytes.NewBuffer(nil) + testBuffer := LineBuffer{} for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { testBuffer.Reset() - WriteLine(testBuffer, testCase.words...) - if !strings.EqualFold(testBuffer.String(), testCase.expected) { - t.Fatalf("write word is %v\n expected: %s, got: %s", testCase.words, testCase.expected, testBuffer.String()) + testBuffer.Write(testCase.words...) + if want, got := testCase.expected, string(testBuffer.Bytes()); !strings.EqualFold(want, got) { + t.Fatalf("write word is %v\n expected: %s, got: %s", testCase.words, want, got) } }) } @@ -1201,13 +1200,13 @@ func TestWriteBytesLine(t *testing.T) { }, } - testBuffer := bytes.NewBuffer(nil) + testBuffer := LineBuffer{} for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { testBuffer.Reset() - WriteBytesLine(testBuffer, testCase.bytes) - if !strings.EqualFold(testBuffer.String(), testCase.expected) { - t.Fatalf("write word is %v\n expected: %s, got: %s", testCase.bytes, testCase.expected, testBuffer.String()) + testBuffer.WriteBytes(testCase.bytes) + if want, got := testCase.expected, string(testBuffer.Bytes()); !strings.EqualFold(want, got) { + t.Fatalf("write bytes is %v\n expected: %s, got: %s", testCase.bytes, want, got) } }) } @@ -1244,12 +1243,12 @@ func TestWriteCountLines(t *testing.T) { expected: 100000, }, } - testBuffer := bytes.NewBuffer(nil) + testBuffer := LineBuffer{} for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { testBuffer.Reset() for i := 0; i < testCase.expected; i++ { - WriteLine(testBuffer, randSeq()) + testBuffer.Write(randSeq()) } n := CountBytesLines(testBuffer.Bytes()) if n != testCase.expected {