From 731dc8cf741e2f3dec72a37a95a638a2b92ff5bf Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Mon, 8 Nov 2021 15:14:49 -0800 Subject: [PATCH] Fix regression in kube-proxy (#106214) * Fix regression in kube-proxy Don't use a prepend() - that allocates. Instead, make Write() take either strings or slices (I wish we could express that better). * WIP: switch to intf * WIP: less appends * tests and ipvs --- pkg/proxy/iptables/proxier.go | 103 +++++++++++++++++++--------------- pkg/proxy/ipvs/proxier.go | 42 +++++++------- pkg/proxy/util/utils.go | 28 ++++++--- pkg/proxy/util/utils_test.go | 41 +++++++++----- 4 files changed, 126 insertions(+), 88 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 03a05f616f1..8c4e938e6b0 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -761,10 +761,6 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE } } -func prepend(sl []string, args ...string) []string { - return append(args, sl...) -} - const endpointChainsNumberThreshold = 1000 // Assumes proxier.mu is held. @@ -937,7 +933,7 @@ func (proxier *Proxier) syncProxyRules() { if proxier.iptables.HasRandomFully() { masqRule = append(masqRule, "--random-fully") } - proxier.natRules.Write(masqRule...) + proxier.natRules.Write(masqRule) // Install the kubernetes-specific masquerade mark rule. We use a whole chain for // this so that it is easier to flush and change, for example if the mark @@ -1067,16 +1063,17 @@ func (proxier *Proxier) syncProxyRules() { args = append(args[:0], "-A", string(endpointChain)) args = proxier.appendServiceCommentLocked(args, svcNameString) // Handle traffic that loops back to the originator with SNAT. - proxier.natRules.Write(append(args, + proxier.natRules.Write( + args, "-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(epInfo.IP())), - "-j", string(KubeMarkMasqChain))...) + "-j", string(KubeMarkMasqChain)) // Update client-affinity lists. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") } // DNAT to final destination. args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", epInfo.Endpoint) - proxier.natRules.Write(args...) + proxier.natRules.Write(args) } svcChain := svcInfo.servicePortChainName @@ -1111,22 +1108,24 @@ func (proxier *Proxier) syncProxyRules() { "--dport", strconv.Itoa(svcInfo.Port()), ) if proxier.masqueradeAll { - args := prepend(args, "-A", string(svcChain)) - args = append(args, "-j", string(KubeMarkMasqChain)) - proxier.natRules.Write(args...) + proxier.natRules.Write( + "-A", string(svcChain), + args, + "-j", string(KubeMarkMasqChain)) } else if proxier.localDetector.IsImplemented() { // This masquerades off-cluster traffic to a service VIP. The idea // is that you can establish a static route for your Service range, // routing to any node, and that node will bridge into the Service // for you. Since that might bounce off-node, we masquerade here. // If/when we support "Local" policy for VIPs, we should update this. - args := prepend(args, "-A", string(svcChain)) - args = proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain)) - proxier.natRules.Write(args...) + proxier.natRules.Write( + "-A", string(svcChain), + proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))) } - args = prepend(args, "-A", string(kubeServicesChain)) - args = append(args, "-j", string(svcChain)) - proxier.natRules.Write(args...) + proxier.natRules.Write( + "-A", string(kubeServicesChain), + args, + "-j", string(svcChain)) } else { // No endpoints. proxier.filterRules.Write( @@ -1189,21 +1188,25 @@ func (proxier *Proxier) syncProxyRules() { // be always forwarded to the corresponding Service, so no need to SNAT // If we can't differentiate the local traffic we always SNAT. if !svcInfo.NodeLocalExternal() { + appendTo := []string{"-A", string(svcChain)} destChain = svcChain - args := prepend(args, "-A", string(svcChain)) // This masquerades off-cluster traffic to a External IP. if proxier.localDetector.IsImplemented() { proxier.natRules.Write( - proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))...) + appendTo, + proxier.localDetector.JumpIfNotLocal(args, string(KubeMarkMasqChain))) } else { proxier.natRules.Write( - append(args, "-j", string(KubeMarkMasqChain))...) + appendTo, + args, + "-j", string(KubeMarkMasqChain)) } } // Send traffic bound for external IPs to the service chain. - args = prepend(args, "-A", string(kubeServicesChain)) proxier.natRules.Write( - append(args, "-j", string(destChain))...) + "-A", string(kubeServicesChain), + args, + "-j", string(destChain)) } else { // No endpoints. @@ -1242,7 +1245,7 @@ func (proxier *Proxier) syncProxyRules() { "--dport", strconv.Itoa(svcInfo.Port()), ) // jump to service firewall chain - proxier.natRules.Write(append(args, "-j", string(fwChain))...) + proxier.natRules.Write(args, "-j", string(fwChain)) args = append(args[:0], "-A", string(fwChain), @@ -1254,18 +1257,18 @@ func (proxier *Proxier) syncProxyRules() { // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. if !svcInfo.NodeLocalExternal() { - proxier.natRules.Write(append(args, "-j", string(KubeMarkMasqChain))...) + proxier.natRules.Write(args, "-j", string(KubeMarkMasqChain)) chosenChain = svcChain } if len(svcInfo.LoadBalancerSourceRanges()) == 0 { // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain - proxier.natRules.Write(append(args, "-j", string(chosenChain))...) + proxier.natRules.Write(args, "-j", string(chosenChain)) } else { // firewall filter based on each source range allowFromNode := false for _, src := range svcInfo.LoadBalancerSourceRanges() { - proxier.natRules.Write(append(args, "-s", src, "-j", string(chosenChain))...) + proxier.natRules.Write(args, "-s", src, "-j", string(chosenChain)) _, cidr, err := netutils.ParseCIDRSloppy(src) if err != nil { klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr) @@ -1277,13 +1280,13 @@ func (proxier *Proxier) syncProxyRules() { // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // Need to add the following rule to allow request on host. if allowFromNode { - proxier.natRules.Write(append(args, "-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(ingress)), "-j", string(chosenChain))...) + proxier.natRules.Write(args, "-s", utilproxy.ToCIDR(netutils.ParseIPSloppy(ingress)), "-j", string(chosenChain)) } } // If the packet was able to reach the end of firewall chain, then it did not get DNATed. // It means the packet cannot go thru the firewall, then mark it for DROP - proxier.natRules.Write(append(args, "-j", string(KubeMarkDropChain))...) + proxier.natRules.Write(args, "-j", string(KubeMarkDropChain)) } else { // No endpoints. proxier.filterRules.Write( @@ -1361,10 +1364,14 @@ func (proxier *Proxier) syncProxyRules() { if !svcInfo.NodeLocalExternal() { // Nodeports need SNAT, unless they're local. proxier.natRules.Write( - append(prepend(args, "-A", string(svcChain)), "-j", string(KubeMarkMasqChain))...) + "-A", string(svcChain), + args, + "-j", string(KubeMarkMasqChain)) // Jump to the service chain. proxier.natRules.Write( - append(prepend(args, "-A", string(kubeNodePortsChain)), "-j", string(svcChain))...) + "-A", string(kubeNodePortsChain), + args, + "-j", string(svcChain)) } else { // TODO: Make all nodePorts jump to the firewall chain. // Currently we only create it for loadbalancers (#33586). @@ -1374,11 +1381,15 @@ func (proxier *Proxier) syncProxyRules() { if isIPv6 { loopback = "::1/128" } - args = prepend(args, "-A", string(kubeNodePortsChain)) + appendTo := []string{"-A", string(kubeNodePortsChain)} proxier.natRules.Write( - append(args, "-s", loopback, "-j", string(KubeMarkMasqChain))...) + appendTo, + args, + "-s", loopback, "-j", string(KubeMarkMasqChain)) proxier.natRules.Write( - append(args, "-j", string(svcXlbChain))...) + appendTo, + args, + "-j", string(svcXlbChain)) } } else { // No endpoints. @@ -1422,7 +1433,7 @@ func (proxier *Proxier) syncProxyRules() { "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap", "-j", string(endpointChain), ) - proxier.natRules.Write(args...) + proxier.natRules.Write(args) } } @@ -1440,8 +1451,7 @@ func (proxier *Proxier) syncProxyRules() { "--probability", proxier.probability(numReadyEndpoints-i)) } // The final (or only if n == 1) rule is a guaranteed match. - args = append(args, "-j", string(endpointChain)) - proxier.natRules.Write(args...) + proxier.natRules.Write(args, "-j", string(endpointChain)) } // The logic below this applies only if this service is marked as OnlyLocal @@ -1458,19 +1468,21 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`, ) - proxier.natRules.Write(proxier.localDetector.JumpIfLocal(args, string(svcChain))...) + proxier.natRules.Write(proxier.localDetector.JumpIfLocal(args, string(svcChain))) } // Next, redirect all src-type=LOCAL -> LB IP to the service chain for externalTrafficPolicy=Local // This allows traffic originating from the host to be redirected to the service correctly, // otherwise traffic to LB IPs are dropped if there are no local endpoints. args = append(args[:0], "-A", string(svcXlbChain)) - proxier.natRules.Write(append(args, + proxier.natRules.Write( + args, "-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s LB IP"`, svcNameString), - "-m", "addrtype", "--src-type", "LOCAL", "-j", string(KubeMarkMasqChain))...) - proxier.natRules.Write(append(args, + "-m", "addrtype", "--src-type", "LOCAL", "-j", string(KubeMarkMasqChain)) + proxier.natRules.Write( + args, "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString), - "-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))...) + "-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain)) numLocalEndpoints := len(localEndpointChains) if numLocalEndpoints == 0 { @@ -1482,7 +1494,7 @@ func (proxier *Proxier) syncProxyRules() { "-j", string(KubeMarkDropChain), ) - proxier.natRules.Write(args...) + proxier.natRules.Write(args) } else { // First write session affinity rules only over local endpoints, if applicable. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { @@ -1512,8 +1524,7 @@ func (proxier *Proxier) syncProxyRules() { "--probability", proxier.probability(numLocalEndpoints-i)) } // The final (or only if n == 1) rule is a guaranteed match. - args = append(args, "-j", string(endpointChain)) - proxier.natRules.Write(args...) + proxier.natRules.Write(args, "-j", string(endpointChain)) } } } @@ -1545,7 +1556,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, "-m", "addrtype", "--dst-type", "LOCAL", "-j", string(kubeNodePortsChain)) - proxier.natRules.Write(args...) + proxier.natRules.Write(args) // Nothing else matters after the zero CIDR. break } @@ -1560,7 +1571,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, "-d", address, "-j", string(kubeNodePortsChain)) - proxier.natRules.Write(args...) + proxier.natRules.Write(args) } // Drop the packets in INVALID state, which would potentially cause diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index b30bad99a69..86ceee4ede3 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1704,7 +1704,7 @@ func (proxier *Proxier) writeIptablesRules() { "-m", "set", "--match-set", proxier.ipsetList[set.name].Name, set.matchType, ) - proxier.natRules.Write(append(args, "-j", set.to)...) + proxier.natRules.Write(args, "-j", set.to) } } @@ -1715,14 +1715,14 @@ func (proxier *Proxier) writeIptablesRules() { "-m", "set", "--match-set", proxier.ipsetList[kubeClusterIPSet].Name, ) if proxier.masqueradeAll { - proxier.natRules.Write(append(args, "dst,dst", "-j", string(KubeMarkMasqChain))...) + proxier.natRules.Write(args, "dst,dst", "-j", string(KubeMarkMasqChain)) } else if proxier.localDetector.IsImplemented() { // This masquerades off-cluster traffic to a service VIP. The idea // is that you can establish a static route for your Service range, // routing to any node, and that node will bridge into the Service // for you. Since that might bounce off-node, we masquerade here. // If/when we support "Local" policy for VIPs, we should update this. - proxier.natRules.Write(proxier.localDetector.JumpIfNotLocal(append(args, "dst,dst"), string(KubeMarkMasqChain))...) + proxier.natRules.Write(proxier.localDetector.JumpIfNotLocal(append(args, "dst,dst"), string(KubeMarkMasqChain))) } else { // Masquerade all OUTPUT traffic coming from a service ip. // The kube dummy interface has all service VIPs assigned which @@ -1731,7 +1731,7 @@ func (proxier *Proxier) writeIptablesRules() { // VIP:. // Always masquerading OUTPUT (node-originating) traffic with a VIP // source ip and service port destination fixes the outgoing connections. - proxier.natRules.Write(append(args, "src,dst", "-j", string(KubeMarkMasqChain))...) + proxier.natRules.Write(args, "src,dst", "-j", string(KubeMarkMasqChain)) } } @@ -1744,11 +1744,11 @@ func (proxier *Proxier) writeIptablesRules() { externalTrafficOnlyArgs := append(args, "-m", "physdev", "!", "--physdev-is-in", "-m", "addrtype", "!", "--src-type", "LOCAL") - proxier.natRules.Write(append(externalTrafficOnlyArgs, "-j", "ACCEPT")...) + proxier.natRules.Write(externalTrafficOnlyArgs, "-j", "ACCEPT") dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. // This covers cases like GCE load-balancers which get added to the local routing table. - proxier.natRules.Write(append(dstLocalOnlyArgs, "-j", "ACCEPT")...) + proxier.natRules.Write(dstLocalOnlyArgs, "-j", "ACCEPT") } if !proxier.ipsetList[kubeExternalIPSet].isEmpty() { @@ -1759,7 +1759,7 @@ func (proxier *Proxier) writeIptablesRules() { "-m", "set", "--match-set", proxier.ipsetList[kubeExternalIPSet].Name, "dst,dst", ) - proxier.natRules.Write(append(args, "-j", string(KubeMarkMasqChain))...) + proxier.natRules.Write(args, "-j", string(KubeMarkMasqChain)) externalIPRules(args) } @@ -1778,19 +1778,19 @@ func (proxier *Proxier) writeIptablesRules() { "-A", string(kubeServicesChain), "-m", "addrtype", "--dst-type", "LOCAL", ) - proxier.natRules.Write(append(args, "-j", string(KubeNodePortChain))...) + proxier.natRules.Write(args, "-j", string(KubeNodePortChain)) // mark drop for KUBE-LOAD-BALANCER - proxier.natRules.Write([]string{ + proxier.natRules.Write( "-A", string(KubeLoadBalancerChain), "-j", string(KubeMarkMasqChain), - }...) + ) // mark drop for KUBE-FIRE-WALL - proxier.natRules.Write([]string{ + proxier.natRules.Write( "-A", string(KubeFireWallChain), "-j", string(KubeMarkDropChain), - }...) + ) // Accept all traffic with destination of ipvs virtual service, in case other iptables rules // block the traffic, that may result in ipvs rules invalid. @@ -1837,17 +1837,17 @@ func (proxier *Proxier) writeIptablesRules() { // this so that it is easier to flush and change, for example if the mark // value should ever change. // NB: THIS MUST MATCH the corresponding code in the kubelet - proxier.natRules.Write([]string{ + proxier.natRules.Write( "-A", string(kubePostroutingChain), "-m", "mark", "!", "--mark", fmt.Sprintf("%s/%s", proxier.masqueradeMark, proxier.masqueradeMark), "-j", "RETURN", - }...) + ) // Clear the mark to avoid re-masquerading if the packet re-traverses the network stack. - proxier.natRules.Write([]string{ + proxier.natRules.Write( "-A", string(kubePostroutingChain), // XOR proxier.masqueradeMark to unset it "-j", "MARK", "--xor-mark", proxier.masqueradeMark, - }...) + ) masqRule := []string{ "-A", string(kubePostroutingChain), "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, @@ -1856,15 +1856,15 @@ func (proxier *Proxier) writeIptablesRules() { if proxier.iptables.HasRandomFully() { masqRule = append(masqRule, "--random-fully") } - proxier.natRules.Write(masqRule...) + proxier.natRules.Write(masqRule) // Install the kubernetes-specific masquerade mark rule. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. - proxier.natRules.Write([]string{ + proxier.natRules.Write( "-A", string(KubeMarkMasqChain), "-j", "MARK", "--or-mark", proxier.masqueradeMark, - }...) + ) // Write the end-of-table markers. proxier.filterRules.Write("COMMIT") @@ -1882,11 +1882,11 @@ func (proxier *Proxier) acceptIPVSTraffic() { default: matchType = "dst,dst" } - proxier.natRules.Write([]string{ + proxier.natRules.Write( "-A", string(kubeServicesChain), "-m", "set", "--match-set", proxier.ipsetList[set].Name, matchType, "-j", "ACCEPT", - }...) + ) } } } diff --git a/pkg/proxy/util/utils.go b/pkg/proxy/util/utils.go index 04a5f4353ee..5cee78ccdb0 100644 --- a/pkg/proxy/util/utils.go +++ b/pkg/proxy/util/utils.go @@ -472,17 +472,29 @@ type LineBuffer struct { b bytes.Buffer } -// Write joins all words with spaces, terminates with newline and writes to buf. -func (buf *LineBuffer) Write(words ...string) { - // We avoid strings.Join for performance reasons. - for i := range words { - buf.b.WriteString(words[i]) - if i < len(words)-1 { +// Write takes a list of arguments, each a string or []string, joins all the +// individual strings with spaces, terminates with newline, and writes to buf. +// Any other argument type will panic. +func (buf *LineBuffer) Write(args ...interface{}) { + for i, arg := range args { + if i > 0 { buf.b.WriteByte(' ') - } else { - buf.b.WriteByte('\n') + } + switch x := arg.(type) { + case string: + buf.b.WriteString(x) + case []string: + for j, s := range x { + if j > 0 { + buf.b.WriteByte(' ') + } + buf.b.WriteString(s) + } + default: + panic(fmt.Sprintf("unknown argument type: %T", x)) } } + buf.b.WriteByte('\n') } // WriteBytes writes bytes to buffer, and terminates with newline. diff --git a/pkg/proxy/util/utils_test.go b/pkg/proxy/util/utils_test.go index 391d7ea34af..c61c7c2cb51 100644 --- a/pkg/proxy/util/utils_test.go +++ b/pkg/proxy/util/utils_test.go @@ -1148,41 +1148,56 @@ func TestRevertPorts(t *testing.T) { } } -func TestWriteLine(t *testing.T) { +func TestLineBufferWrite(t *testing.T) { testCases := []struct { name string - words []string + input []interface{} expected string }{ { - name: "write no word", - words: []string{}, - expected: "", + name: "none", + input: []interface{}{}, + expected: "\n", }, { - name: "write one word", - words: []string{"test1"}, + name: "one string", + input: []interface{}{"test1"}, expected: "test1\n", }, { - name: "write multi word", - words: []string{"test1", "test2", "test3"}, - expected: "test1 test2 test3\n", + name: "one slice", + input: []interface{}{[]string{"test1", "test2"}}, + expected: "test1 test2\n", + }, + { + name: "mixed", + input: []interface{}{"s1", "s2", []string{"s3", "s4"}, "", "s5", []string{}, []string{"s6"}, "s7"}, + expected: "s1 s2 s3 s4 s5 s6 s7\n", }, } testBuffer := LineBuffer{} for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { testBuffer.Reset() - testBuffer.Write(testCase.words...) + testBuffer.Write(testCase.input...) if want, got := testCase.expected, string(testBuffer.Bytes()); !strings.EqualFold(want, got) { - t.Fatalf("write word is %v\n expected: %s, got: %s", testCase.words, want, got) + t.Fatalf("write word is %v\n expected: %q, got: %q", testCase.input, want, got) } }) } } -func TestWriteBytesLine(t *testing.T) { +func TestLineBufferWritePanic(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("did not panic") + } + }() + testBuffer := LineBuffer{} + testBuffer.Write("string", []string{"a", "slice"}, 1234) +} + +func TestLineBufferWriteBytes(t *testing.T) { testCases := []struct { name string bytes []byte