From e3bb755270fbf47cf97a4e0a550a7c748ec9aa8e Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 18 May 2017 12:25:10 +0200 Subject: [PATCH] Reuse buffers for generated iptables rules --- pkg/proxy/iptables/proxier.go | 139 ++++++++++++++++------------- pkg/proxy/iptables/proxier_test.go | 4 + 2 files changed, 80 insertions(+), 63 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index 1eb90c5f8bb..b43502cffe9 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -306,6 +306,10 @@ type Proxier struct { // The following buffers are used to reuse memory and avoid allocations // that are significantly impacting performance. iptablesLines *bytes.Buffer + filterChains *bytes.Buffer + filterRules *bytes.Buffer + natChains *bytes.Buffer + natRules *bytes.Buffer } type localPort struct { @@ -422,6 +426,10 @@ func NewProxier(ipt utiliptables.Interface, healthChecker: healthChecker, healthzServer: healthzServer, iptablesLines: bytes.NewBuffer(nil), + filterChains: bytes.NewBuffer(nil), + filterRules: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), }, nil } @@ -998,47 +1006,48 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesLines.Bytes()) } - filterChains := bytes.NewBuffer(nil) - filterRules := bytes.NewBuffer(nil) - natChains := bytes.NewBuffer(nil) - natRules := bytes.NewBuffer(nil) + // Reset all buffers used later. + proxier.filterChains.Reset() + proxier.filterRules.Reset() + proxier.natChains.Reset() + proxier.natRules.Reset() // Write table headers. - writeLine(filterChains, "*filter") - writeLine(natChains, "*nat") + writeLine(proxier.filterChains, "*filter") + writeLine(proxier.natChains, "*nat") // Make sure we keep stats for the top-level chains, if they existed // (which most should have because we created them above). if chain, ok := existingFilterChains[kubeServicesChain]; ok { - writeLine(filterChains, chain) + writeLine(proxier.filterChains, chain) } else { - writeLine(filterChains, utiliptables.MakeChainLine(kubeServicesChain)) + writeLine(proxier.filterChains, utiliptables.MakeChainLine(kubeServicesChain)) } if chain, ok := existingNATChains[kubeServicesChain]; ok { - writeLine(natChains, chain) + writeLine(proxier.natChains, chain) } else { - writeLine(natChains, utiliptables.MakeChainLine(kubeServicesChain)) + writeLine(proxier.natChains, utiliptables.MakeChainLine(kubeServicesChain)) } if chain, ok := existingNATChains[kubeNodePortsChain]; ok { - writeLine(natChains, chain) + writeLine(proxier.natChains, chain) } else { - writeLine(natChains, utiliptables.MakeChainLine(kubeNodePortsChain)) + writeLine(proxier.natChains, utiliptables.MakeChainLine(kubeNodePortsChain)) } if chain, ok := existingNATChains[kubePostroutingChain]; ok { - writeLine(natChains, chain) + writeLine(proxier.natChains, chain) } else { - writeLine(natChains, utiliptables.MakeChainLine(kubePostroutingChain)) + writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain)) } if chain, ok := existingNATChains[KubeMarkMasqChain]; ok { - writeLine(natChains, chain) + writeLine(proxier.natChains, chain) } else { - writeLine(natChains, utiliptables.MakeChainLine(KubeMarkMasqChain)) + writeLine(proxier.natChains, utiliptables.MakeChainLine(KubeMarkMasqChain)) } // Install the kubernetes-specific postrouting rules. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. - writeLine(natRules, []string{ + writeLine(proxier.natRules, []string{ "-A", string(kubePostroutingChain), "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, "-m", "mark", "--mark", proxier.masqueradeMark, @@ -1048,7 +1057,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // Install the kubernetes-specific masquerade mark rule. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. - writeLine(natRules, []string{ + writeLine(proxier.natRules, []string{ "-A", string(KubeMarkMasqChain), "-j", "MARK", "--set-xmark", proxier.masqueradeMark, }...) @@ -1069,9 +1078,9 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // Create the per-service chain, retaining counters if possible. svcChain := servicePortChainName(svcNameString, protocol) if chain, ok := existingNATChains[svcChain]; ok { - writeLine(natChains, chain) + writeLine(proxier.natChains, chain) } else { - writeLine(natChains, utiliptables.MakeChainLine(svcChain)) + writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain)) } activeNATChains[svcChain] = true @@ -1080,9 +1089,9 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // Only for services request OnlyLocal traffic // create the per-service LB chain, retaining counters if possible. if lbChain, ok := existingNATChains[svcXlbChain]; ok { - writeLine(natChains, lbChain) + writeLine(proxier.natChains, lbChain) } else { - writeLine(natChains, utiliptables.MakeChainLine(svcXlbChain)) + writeLine(proxier.natChains, utiliptables.MakeChainLine(svcXlbChain)) } activeNATChains[svcXlbChain] = true } else if activeNATChains[svcXlbChain] { @@ -1099,12 +1108,12 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { "--dport", fmt.Sprintf("%d", svcInfo.port), } if proxier.masqueradeAll { - writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) } if len(proxier.clusterCIDR) > 0 { - writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) + writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) } - writeLine(natRules, append(args, "-j", string(svcChain))...) + writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) // Capture externalIPs. for _, externalIP := range svcInfo.externalIPs { @@ -1149,7 +1158,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { "--dport", fmt.Sprintf("%d", svcInfo.port), } // We have to SNAT packets to external IPs. - writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) // nor from a local process to be forwarded to the service. @@ -1158,16 +1167,16 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { externalTrafficOnlyArgs := append(args, "-m", "physdev", "!", "--physdev-is-in", "-m", "addrtype", "!", "--src-type", "LOCAL") - writeLine(natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...) + writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...) dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. // This covers cases like GCE load-balancers which get added to the local routing table. - writeLine(natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) + writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) // If the service has no endpoints then reject packets coming via externalIP // Install ICMP Reject rule in filter table for destination=externalIP and dport=svcport if len(proxier.endpointsMap[svcName]) == 0 { - writeLine(filterRules, + writeLine(proxier.filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, @@ -1184,9 +1193,9 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // create service firewall chain fwChain := serviceFirewallChainName(svcNameString, protocol) if chain, ok := existingNATChains[fwChain]; ok { - writeLine(natChains, chain) + writeLine(proxier.natChains, chain) } else { - writeLine(natChains, utiliptables.MakeChainLine(fwChain)) + writeLine(proxier.natChains, utiliptables.MakeChainLine(fwChain)) } activeNATChains[fwChain] = true // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. @@ -1201,7 +1210,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { "--dport", fmt.Sprintf("%d", svcInfo.port), } // jump to service firewall chain - writeLine(natRules, append(args, "-j", string(fwChain))...) + writeLine(proxier.natRules, append(args, "-j", string(fwChain))...) args = []string{ "-A", string(fwChain), @@ -1213,18 +1222,18 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. if !svcInfo.onlyNodeLocalEndpoints { - writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) chosenChain = svcChain } if len(svcInfo.loadBalancerSourceRanges) == 0 { // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain - writeLine(natRules, append(args, "-j", string(chosenChain))...) + writeLine(proxier.natRules, append(args, "-j", string(chosenChain))...) } else { // firewall filter based on each source range allowFromNode := false for _, src := range svcInfo.loadBalancerSourceRanges { - writeLine(natRules, append(args, "-s", src, "-j", string(chosenChain))...) + writeLine(proxier.natRules, append(args, "-s", src, "-j", string(chosenChain))...) // ignore error because it has been validated _, cidr, _ := net.ParseCIDR(src) if cidr.Contains(proxier.nodeIP) { @@ -1235,13 +1244,13 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // Need to add the following rule to allow request on host. if allowFromNode { - writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...) + writeLine(proxier.natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...) } } // If the packet was able to reach the end of firewall chain, then it did not get DNATed. // It means the packet cannot go thru the firewall, then mark it for DROP - writeLine(natRules, append(args, "-j", string(KubeMarkDropChain))...) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) } } @@ -1280,13 +1289,13 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } if !svcInfo.onlyNodeLocalEndpoints { // Nodeports need SNAT, unless they're local. - writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) // Jump to the service chain. - writeLine(natRules, append(args, "-j", string(svcChain))...) + writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) } else { // TODO: Make all nodePorts jump to the firewall chain. // Currently we only create it for loadbalancers (#33586). - writeLine(natRules, append(args, "-j", string(svcXlbChain))...) + writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...) } // If the service has no endpoints then reject packets. The filter @@ -1294,7 +1303,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // the nat table does, so we just stick this into the kube-services // chain. if len(proxier.endpointsMap[svcName]) == 0 { - writeLine(filterRules, + writeLine(proxier.filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "addrtype", "--dst-type", "LOCAL", @@ -1307,7 +1316,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // If the service has no endpoints then reject packets. if len(proxier.endpointsMap[svcName]) == 0 { - writeLine(filterRules, + writeLine(proxier.filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, @@ -1332,9 +1341,9 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // Create the endpoint chain, retaining counters if possible. if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok { - writeLine(natChains, chain) + writeLine(proxier.natChains, chain) } else { - writeLine(natChains, utiliptables.MakeChainLine(endpointChain)) + writeLine(proxier.natChains, utiliptables.MakeChainLine(endpointChain)) } activeNATChains[endpointChain] = true } @@ -1342,7 +1351,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // First write session affinity rules, if applicable. if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { for _, endpointChain := range endpointChains { - writeLine(natRules, + writeLine(proxier.natRules, "-A", string(svcChain), "-m", "comment", "--comment", svcNameString, "-m", "recent", "--name", string(endpointChain), @@ -1368,7 +1377,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) - writeLine(natRules, args...) + writeLine(proxier.natRules, args...) // Rules in the per-endpoint chain. args = []string{ @@ -1376,7 +1385,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { "-m", "comment", "--comment", svcNameString, } // Handle traffic that loops back to the originator with SNAT. - writeLine(natRules, append(args, + writeLine(proxier.natRules, append(args, "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].endpoint, ":")[0]), "-j", string(KubeMarkMasqChain))...) // Update client-affinity lists. @@ -1385,7 +1394,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } // DNAT to final destination. args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].endpoint) - writeLine(natRules, args...) + writeLine(proxier.natRules, args...) } // The logic below this applies only if this service is marked as OnlyLocal @@ -1415,7 +1424,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { "-s", proxier.clusterCIDR, "-j", string(svcChain), } - writeLine(natRules, args...) + writeLine(proxier.natRules, args...) } numLocalEndpoints := len(localEndpointChains) @@ -1428,7 +1437,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { "-j", string(KubeMarkDropChain), } - writeLine(natRules, args...) + writeLine(proxier.natRules, args...) } else { // Setup probability filter rules only over local endpoints for i, endpointChain := range localEndpointChains { @@ -1447,7 +1456,7 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) - writeLine(natRules, args...) + writeLine(proxier.natRules, args...) } } } @@ -1463,33 +1472,37 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) { // We must (as per iptables) write a chain-line for it, which has // the nice effect of flushing the chain. Then we can remove the // chain. - writeLine(natChains, existingNATChains[chain]) - writeLine(natRules, "-X", chainString) + writeLine(proxier.natChains, existingNATChains[chain]) + writeLine(proxier.natRules, "-X", chainString) } } // Finally, tail-call to the nodeports chain. This needs to be after all // other service portal rules. - writeLine(natRules, + writeLine(proxier.natRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, "-m", "addrtype", "--dst-type", "LOCAL", "-j", string(kubeNodePortsChain)) // Write the end-of-table markers. - writeLine(filterRules, "COMMIT") - writeLine(natRules, "COMMIT") + writeLine(proxier.filterRules, "COMMIT") + writeLine(proxier.natRules, "COMMIT") // Sync rules. - // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. - filterLines := append(filterChains.Bytes(), filterRules.Bytes()...) - natLines := append(natChains.Bytes(), natRules.Bytes()...) - lines := append(filterLines, natLines...) + // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table + proxier.iptablesLines.Reset() + proxier.iptablesLines.Write(proxier.filterChains.Bytes()) + proxier.iptablesLines.Write(proxier.filterRules.Bytes()) + proxier.iptablesLines.Write(proxier.natChains.Bytes()) + proxier.iptablesLines.Write(proxier.natRules.Bytes()) - glog.V(3).Infof("Restoring iptables rules: %s", lines) - err = proxier.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) + if glog.V(4) { + glog.V(4).Infof("Restoring iptables rules: %s", proxier.iptablesLines.Bytes()) + } + err = proxier.iptables.RestoreAll(proxier.iptablesLines.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) if err != nil { - glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, lines) + glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, proxier.iptablesLines.Bytes()) // Revert new local ports. revertPorts(replacementPortsMap, proxier.portsMap) return diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index b037ca68efc..72a3e1b9ad3 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -396,6 +396,10 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { portMapper: &fakePortOpener{[]*localPort{}}, healthChecker: newFakeHealthChecker(), iptablesLines: bytes.NewBuffer(nil), + filterChains: bytes.NewBuffer(nil), + filterRules: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), } }