Merge pull request #51682 from m1093782566/ipvs-rsync-iptables

Automatic merge from submit-queue

rsync IPVS proxier to the HEAD of iptables

**What this PR does / why we need it**:

There was a significant performance improvement made to iptables. Since IPVS proxier makes use of iptables in some use cases, I think we should rsync IPVS proxier to the HEAD of iptables.

**Which issue this PR fixes** : 

xref #51679 

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-09-03 15:48:31 -07:00 committed by GitHub
commit 7a219684a9
2 changed files with 52 additions and 26 deletions

View File

@ -126,6 +126,11 @@ type Proxier struct {
ipvsScheduler string ipvsScheduler string
// Added as a member to the struct to allow injection for testing. // Added as a member to the struct to allow injection for testing.
ipGetter IPGetter ipGetter IPGetter
// The following buffers are used to reuse memory and avoid allocations
// that are significantly impacting performance.
iptablesData *bytes.Buffer
natChains *bytes.Buffer
natRules *bytes.Buffer
} }
// IPGetter helps get node network interface IP // IPGetter helps get node network interface IP
@ -268,6 +273,9 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface,
ipvs: ipvs, ipvs: ipvs,
ipvsScheduler: scheduler, ipvsScheduler: scheduler,
ipGetter: &realIPGetter{}, ipGetter: &realIPGetter{},
iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
}, nil }, nil
} }
@ -859,28 +867,30 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
// Get iptables-save output so we can check for existing chains and rules. // Get iptables-save output so we can check for existing chains and rules.
// This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore
existingNATChains := make(map[utiliptables.Chain]string) existingNATChains := make(map[utiliptables.Chain]string)
iptablesData := bytes.NewBuffer(nil) proxier.iptablesData.Reset()
err := proxier.iptables.SaveInto(utiliptables.TableNAT, iptablesData) err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
if err != nil { // if we failed to get any rules if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output } else { // otherwise parse the output
existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesData.Bytes()) existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
} }
natChains := bytes.NewBuffer(nil) // Reset all buffers used later.
natRules := bytes.NewBuffer(nil) // This is to avoid memory reallocations and thus improve performance.
proxier.natChains.Reset()
proxier.natRules.Reset()
// Write table headers. // Write table headers.
writeLine(natChains, "*nat") writeLine(proxier.natChains, "*nat")
// Make sure we keep stats for the top-level chains, if they existed // Make sure we keep stats for the top-level chains, if they existed
// (which most should have because we created them above). // (which most should have because we created them above).
if chain, ok := existingNATChains[kubePostroutingChain]; ok { if chain, ok := existingNATChains[kubePostroutingChain]; ok {
writeLine(natChains, chain) writeLine(proxier.natChains, chain)
} else { } else {
writeLine(natChains, utiliptables.MakeChainLine(kubePostroutingChain)) writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain))
} }
// Install the kubernetes-specific postrouting rules. We use a whole chain for // Install the kubernetes-specific postrouting rules. We use a whole chain for
// this so that it is easier to flush and change, for example if the mark // this so that it is easier to flush and change, for example if the mark
// value should ever change. // value should ever change.
writeLine(natRules, []string{ writeLine(proxier.natRules, []string{
"-A", string(kubePostroutingChain), "-A", string(kubePostroutingChain),
"-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
"-m", "mark", "--mark", proxier.masqueradeMark, "-m", "mark", "--mark", proxier.masqueradeMark,
@ -888,14 +898,14 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
}...) }...)
if chain, ok := existingNATChains[KubeMarkMasqChain]; ok { if chain, ok := existingNATChains[KubeMarkMasqChain]; ok {
writeLine(natChains, chain) writeLine(proxier.natChains, chain)
} else { } else {
writeLine(natChains, utiliptables.MakeChainLine(KubeMarkMasqChain)) writeLine(proxier.natChains, utiliptables.MakeChainLine(KubeMarkMasqChain))
} }
// Install the kubernetes-specific masquerade mark rule. We use a whole chain for // Install the kubernetes-specific masquerade mark rule. We use a whole chain for
// this so that it is easier to flush and change, for example if the mark // this so that it is easier to flush and change, for example if the mark
// value should ever change. // value should ever change.
writeLine(natRules, []string{ writeLine(proxier.natRules, []string{
"-A", string(KubeMarkMasqChain), "-A", string(KubeMarkMasqChain),
"-j", "MARK", "--set-xmark", proxier.masqueradeMark, "-j", "MARK", "--set-xmark", proxier.masqueradeMark,
}...) }...)
@ -915,6 +925,18 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
// currentIPVSServices represent IPVS services listed from the system // currentIPVSServices represent IPVS services listed from the system
currentIPVSServices := make(map[string]*utilipvs.VirtualServer) currentIPVSServices := make(map[string]*utilipvs.VirtualServer)
// We are creating those slices ones here to avoid memory reallocations
// in every loop. Note that reuse the memory, instead of doing:
// slice = <some new slice>
// you should always do one of the below:
// slice = slice[:0] // and then append to it
// slice = append(slice[:0], ...)
// To avoid growing this slice, we arbitrarily set its size to 64,
// there is never more than that many arguments for a single line.
// Note that even if we go over 64, it will still be correct - it
// is just for efficiency, not correctness.
args := make([]string, 64)
// Build IPVS rules for each service. // Build IPVS rules for each service.
for svcName, svcInfo := range proxier.serviceMap { for svcName, svcInfo := range proxier.serviceMap {
protocol := strings.ToLower(string(svcInfo.protocol)) protocol := strings.ToLower(string(svcInfo.protocol))
@ -945,30 +967,30 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
glog.Errorf("Failed to sync service: %v, err: %v", serv, err) glog.Errorf("Failed to sync service: %v, err: %v", serv, err)
} }
// Install masquerade rules if 'masqueradeAll' or 'clusterCIDR' is specified. // Install masquerade rules if 'masqueradeAll' or 'clusterCIDR' is specified.
args := []string{ args = append(args[:0],
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
"--dport", strconv.Itoa(svcInfo.port), "--dport", strconv.Itoa(svcInfo.port),
} )
if proxier.masqueradeAll { if proxier.masqueradeAll {
err = proxier.linkKubeServiceChain(existingNATChains, natChains) err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains)
if err != nil { if err != nil {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
} }
writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
} else if len(proxier.clusterCIDR) > 0 { } else if len(proxier.clusterCIDR) > 0 {
// This masquerades off-cluster traffic to a service VIP. The idea // This masquerades off-cluster traffic to a service VIP. The idea
// is that you can establish a static route for your Service range, // is that you can establish a static route for your Service range,
// routing to any node, and that node will bridge into the Service // routing to any node, and that node will bridge into the Service
// for you. Since that might bounce off-node, we masquerade here. // for you. Since that might bounce off-node, we masquerade here.
// If/when we support "Local" policy for VIPs, we should update this. // If/when we support "Local" policy for VIPs, we should update this.
err = proxier.linkKubeServiceChain(existingNATChains, natChains) err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains)
if err != nil { if err != nil {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
} }
writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...)
} }
// Capture externalIPs. // Capture externalIPs.
@ -1029,24 +1051,24 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
for _, ingress := range svcInfo.loadBalancerStatus.Ingress { for _, ingress := range svcInfo.loadBalancerStatus.Ingress {
if ingress.IP != "" { if ingress.IP != "" {
if len(svcInfo.loadBalancerSourceRanges) != 0 { if len(svcInfo.loadBalancerSourceRanges) != 0 {
err = proxier.linkKubeServiceChain(existingNATChains, natChains) err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains)
if err != nil { if err != nil {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
} }
// The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field.
// This currently works for loadbalancers that preserves source ips. // This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
args := []string{ args = append(args[:0],
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
"-m", string(svcInfo.protocol), "-p", string(svcInfo.protocol), "-m", string(svcInfo.protocol), "-p", string(svcInfo.protocol),
"-d", fmt.Sprintf("%s/32", ingress.IP), "-d", fmt.Sprintf("%s/32", ingress.IP),
"--dport", fmt.Sprintf("%d", svcInfo.port), "--dport", fmt.Sprintf("%d", svcInfo.port),
} )
allowFromNode := false allowFromNode := false
for _, src := range svcInfo.loadBalancerSourceRanges { for _, src := range svcInfo.loadBalancerSourceRanges {
writeLine(natRules, append(args, "-s", src, "-j", "ACCEPT")...) writeLine(proxier.natRules, append(args, "-s", src, "-j", "ACCEPT")...)
// ignore error because it has been validated // ignore error because it has been validated
_, cidr, _ := net.ParseCIDR(src) _, cidr, _ := net.ParseCIDR(src)
if cidr.Contains(proxier.nodeIP) { if cidr.Contains(proxier.nodeIP) {
@ -1057,12 +1079,12 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
// loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly.
// Need to add the following rule to allow request on host. // Need to add the following rule to allow request on host.
if allowFromNode { if allowFromNode {
writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", "ACCEPT")...) writeLine(proxier.natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", "ACCEPT")...)
} }
// If the packet was able to reach the end of firewall chain, then it did not get DNATed. // If the packet was able to reach the end of firewall chain, then it did not get DNATed.
// It means the packet cannot go through the firewall, then DROP it. // It means the packet cannot go through the firewall, then DROP it.
writeLine(natRules, append(args, "-j", string(KubeMarkDropChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
} }
serv := &utilipvs.VirtualServer{ serv := &utilipvs.VirtualServer{
@ -1140,11 +1162,11 @@ func (proxier *Proxier) syncProxyRules(reason syncReason) {
} }
// Write the end-of-table markers. // Write the end-of-table markers.
writeLine(natRules, "COMMIT") writeLine(proxier.natRules, "COMMIT")
// Sync iptables rules. // Sync iptables rules.
// NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table.
natLines := append(natChains.Bytes(), natRules.Bytes()...) natLines := append(proxier.natChains.Bytes(), proxier.natRules.Bytes()...)
lines := natLines lines := natLines
glog.V(3).Infof("Restoring iptables rules: %s", lines) glog.V(3).Infof("Restoring iptables rules: %s", lines)

View File

@ -19,6 +19,7 @@ limitations under the License.
package ipvs package ipvs
import ( import (
"bytes"
"net" "net"
"reflect" "reflect"
"testing" "testing"
@ -112,6 +113,9 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs
healthChecker: newFakeHealthChecker(), healthChecker: newFakeHealthChecker(),
ipvsScheduler: DefaultScheduler, ipvsScheduler: DefaultScheduler,
ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, ipGetter: &fakeIPGetter{nodeIPs: nodeIPs},
iptablesData: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
} }
} }