From 96882713b4d699b66127b92fdbfc7c4000021f24 Mon Sep 17 00:00:00 2001 From: m1093782566 Date: Wed, 30 Aug 2017 16:48:06 +0800 Subject: [PATCH] rsync iptables --- pkg/proxy/ipvs/proxier.go | 74 ++++++++++++++++++++++------------ pkg/proxy/ipvs/proxier_test.go | 4 ++ 2 files changed, 52 insertions(+), 26 deletions(-) diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 34e4ecae2ae..6c4ad357240 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -126,6 +126,11 @@ type Proxier struct { ipvsScheduler string // Added as a member to the struct to allow injection for testing. ipGetter IPGetter + // The following buffers are used to reuse memory and avoid allocations + // that are significantly impacting performance. + iptablesData *bytes.Buffer + natChains *bytes.Buffer + natRules *bytes.Buffer } // IPGetter helps get node network interface IP @@ -268,6 +273,9 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipvs: ipvs, ipvsScheduler: scheduler, ipGetter: &realIPGetter{}, + iptablesData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), }, nil } @@ -859,28 +867,30 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // Get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore existingNATChains := make(map[utiliptables.Chain]string) - iptablesData := bytes.NewBuffer(nil) - err := proxier.iptables.SaveInto(utiliptables.TableNAT, iptablesData) + proxier.iptablesData.Reset() + err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData) if err != nil { // if we failed to get any rules glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) } else { // otherwise parse the output - existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes()) + existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes()) } - natChains := bytes.NewBuffer(nil) - natRules := bytes.NewBuffer(nil) + // Reset all buffers used later. + // This is to avoid memory reallocations and thus improve performance. + proxier.natChains.Reset() + proxier.natRules.Reset() // Write table headers. - writeLine(natChains, "*nat") + writeLine(proxier.natChains, "*nat") // Make sure we keep stats for the top-level chains, if they existed // (which most should have because we created them above). if chain, ok := existingNATChains[kubePostroutingChain]; ok { - writeLine(natChains, chain) + writeLine(proxier.natChains, chain) } else { - writeLine(natChains, utiliptables.MakeChainLine(kubePostroutingChain)) + writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain)) } // Install the kubernetes-specific postrouting rules. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. - writeLine(natRules, []string{ + writeLine(proxier.natRules, []string{ "-A", string(kubePostroutingChain), "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, "-m", "mark", "--mark", proxier.masqueradeMark, @@ -888,14 +898,14 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { }...) if chain, ok := existingNATChains[KubeMarkMasqChain]; ok { - writeLine(natChains, chain) + writeLine(proxier.natChains, chain) } else { - writeLine(natChains, utiliptables.MakeChainLine(KubeMarkMasqChain)) + writeLine(proxier.natChains, utiliptables.MakeChainLine(KubeMarkMasqChain)) } // Install the kubernetes-specific masquerade mark rule. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. - writeLine(natRules, []string{ + writeLine(proxier.natRules, []string{ "-A", string(KubeMarkMasqChain), "-j", "MARK", "--set-xmark", proxier.masqueradeMark, }...) @@ -915,6 +925,18 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // currentIPVSServices represent IPVS services listed from the system currentIPVSServices := make(map[string]*utilipvs.VirtualServer) + // We are creating those slices ones here to avoid memory reallocations + // in every loop. Note that reuse the memory, instead of doing: + // slice = + // you should always do one of the below: + // slice = slice[:0] // and then append to it + // slice = append(slice[:0], ...) + // To avoid growing this slice, we arbitrarily set its size to 64, + // there is never more than that many arguments for a single line. + // Note that even if we go over 64, it will still be correct - it + // is just for efficiency, not correctness. + args := make([]string, 64) + // Build IPVS rules for each service. for svcName, svcInfo := range proxier.serviceMap { protocol := strings.ToLower(string(svcInfo.protocol)) @@ -945,30 +967,30 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { glog.Errorf("Failed to sync service: %v, err: %v", serv, err) } // Install masquerade rules if 'masqueradeAll' or 'clusterCIDR' is specified. - args := []string{ + args = append(args[:0], "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "--dport", strconv.Itoa(svcInfo.port), - } + ) if proxier.masqueradeAll { - err = proxier.linkKubeServiceChain(existingNATChains, natChains) + err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains) if err != nil { glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) } - writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) } else if len(proxier.clusterCIDR) > 0 { // This masquerades off-cluster traffic to a service VIP. The idea // is that you can establish a static route for your Service range, // routing to any node, and that node will bridge into the Service // for you. Since that might bounce off-node, we masquerade here. // If/when we support "Local" policy for VIPs, we should update this. - err = proxier.linkKubeServiceChain(existingNATChains, natChains) + err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains) if err != nil { glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) } - writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) + writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) } // Capture externalIPs. @@ -1029,24 +1051,24 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { for _, ingress := range svcInfo.loadBalancerStatus.Ingress { if ingress.IP != "" { if len(svcInfo.loadBalancerSourceRanges) != 0 { - err = proxier.linkKubeServiceChain(existingNATChains, natChains) + err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains) if err != nil { glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) } // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // This currently works for loadbalancers that preserves source ips. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. - args := []string{ + args = append(args[:0], "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), "-m", string(svcInfo.protocol), "-p", string(svcInfo.protocol), "-d", fmt.Sprintf("%s/32", ingress.IP), "--dport", fmt.Sprintf("%d", svcInfo.port), - } + ) allowFromNode := false for _, src := range svcInfo.loadBalancerSourceRanges { - writeLine(natRules, append(args, "-s", src, "-j", "ACCEPT")...) + writeLine(proxier.natRules, append(args, "-s", src, "-j", "ACCEPT")...) // ignore error because it has been validated _, cidr, _ := net.ParseCIDR(src) if cidr.Contains(proxier.nodeIP) { @@ -1057,12 +1079,12 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // Need to add the following rule to allow request on host. if allowFromNode { - writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", "ACCEPT")...) + writeLine(proxier.natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", "ACCEPT")...) } // If the packet was able to reach the end of firewall chain, then it did not get DNATed. // It means the packet cannot go through the firewall, then DROP it. - writeLine(natRules, append(args, "-j", string(KubeMarkDropChain))...) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) } serv := &utilipvs.VirtualServer{ @@ -1140,11 +1162,11 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } // Write the end-of-table markers. - writeLine(natRules, "COMMIT") + writeLine(proxier.natRules, "COMMIT") // Sync iptables rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. - natLines := append(natChains.Bytes(), natRules.Bytes()...) + natLines := append(proxier.natChains.Bytes(), proxier.natRules.Bytes()...) lines := natLines glog.V(3).Infof("Restoring iptables rules: %s", lines) diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index c70fcf283a8..b02c6eae7ef 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -19,6 +19,7 @@ limitations under the License. package ipvs import ( + "bytes" "net" "reflect" "testing" @@ -112,6 +113,9 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs healthChecker: newFakeHealthChecker(), ipvsScheduler: DefaultScheduler, ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, + iptablesData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), } }