From 070f393bc80189f9e6735e987195edbcab08d1ef Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Wed, 24 May 2017 12:53:11 +0200 Subject: [PATCH] Precompute probabilities in iptables kube-proxy. --- pkg/proxy/iptables/proxier.go | 92 +++++++++++++++++++----------- pkg/proxy/iptables/proxier_test.go | 1 + 2 files changed, 61 insertions(+), 32 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index a29667cb625..769f959ee17 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -354,6 +354,11 @@ type Proxier struct { healthChecker healthcheck.Server healthzServer healthcheck.HealthzUpdater + // Since converting probabilities (floats) to strings is expensive + // and we are using only probabilities in the format of 1/n, we are + // precomputing some number of those and cache for future reuse. + precomputedProbabilities []string + // The following buffers are used to reuse memory and avoid allocations // that are significantly impacting performance. iptablesData *bytes.Buffer @@ -449,27 +454,28 @@ func NewProxier(ipt utiliptables.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps proxier := &Proxier{ - portsMap: make(map[localPort]closeable), - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(hostname), - iptables: ipt, - masqueradeAll: masqueradeAll, - masqueradeMark: masqueradeMark, - exec: exec, - clusterCIDR: clusterCIDR, - hostname: hostname, - nodeIP: nodeIP, - portMapper: &listenPortOpener{}, - recorder: recorder, - healthChecker: healthChecker, - healthzServer: healthzServer, - iptablesData: bytes.NewBuffer(nil), - filterChains: bytes.NewBuffer(nil), - filterRules: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), + portsMap: make(map[localPort]closeable), + serviceMap: make(proxyServiceMap), + serviceChanges: newServiceChangeMap(), + endpointsMap: make(proxyEndpointsMap), + endpointsChanges: newEndpointsChangeMap(hostname), + iptables: ipt, + masqueradeAll: masqueradeAll, + masqueradeMark: masqueradeMark, + exec: exec, + clusterCIDR: clusterCIDR, + hostname: hostname, + nodeIP: nodeIP, + portMapper: &listenPortOpener{}, + recorder: recorder, + healthChecker: healthChecker, + healthzServer: healthzServer, + precomputedProbabilities: make([]string, 0, 1001), + iptablesData: bytes.NewBuffer(nil), + filterChains: bytes.NewBuffer(nil), + filterRules: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), } burstSyncs := 2 glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) @@ -565,6 +571,28 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { return encounteredError } +func computeProbability(n int) string { + return fmt.Sprintf("%0.5f", 1.0/float64(n)) +} + +// This assumes proxier.mu is held +func (proxier *Proxier) precomputeProbabilities(numberOfPrecomputed int) { + if len(proxier.precomputedProbabilities) == 0 { + proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, "") + } + for i := len(proxier.precomputedProbabilities); i <= numberOfPrecomputed; i++ { + proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, computeProbability(i)) + } +} + +// This assumes proxier.mu is held +func (proxier *Proxier) probability(n int) string { + if n >= len(proxier.precomputedProbabilities) { + proxier.precomputeProbabilities(n) + } + return proxier.precomputedProbabilities[n] +} + // Sync is called to synchronize the proxier state to iptables as soon as possible. func (proxier *Proxier) Sync() { proxier.syncRunner.Run() @@ -1104,7 +1132,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), - "--dport", fmt.Sprintf("%d", svcInfo.port), + "--dport", strconv.Itoa(svcInfo.port), } if proxier.masqueradeAll { writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) @@ -1154,7 +1182,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", externalIP), - "--dport", fmt.Sprintf("%d", svcInfo.port), + "--dport", strconv.Itoa(svcInfo.port), } // We have to SNAT packets to external IPs. writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) @@ -1180,7 +1208,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", externalIP), - "--dport", fmt.Sprintf("%d", svcInfo.port), + "--dport", strconv.Itoa(svcInfo.port), "-j", "REJECT", ) } @@ -1206,7 +1234,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", ingress.IP), - "--dport", fmt.Sprintf("%d", svcInfo.port), + "--dport", strconv.Itoa(svcInfo.port), } // jump to service firewall chain writeLine(proxier.natRules, append(args, "-j", string(fwChain))...) @@ -1284,7 +1312,7 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(kubeNodePortsChain), "-m", "comment", "--comment", svcNameString, "-m", protocol, "-p", protocol, - "--dport", fmt.Sprintf("%d", svcInfo.nodePort), + "--dport", strconv.Itoa(svcInfo.nodePort), } if !svcInfo.onlyNodeLocalEndpoints { // Nodeports need SNAT, unless they're local. @@ -1307,7 +1335,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "addrtype", "--dst-type", "LOCAL", "-m", protocol, "-p", protocol, - "--dport", fmt.Sprintf("%d", svcInfo.nodePort), + "--dport", strconv.Itoa(svcInfo.nodePort), "-j", "REJECT", ) } @@ -1320,7 +1348,7 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), - "--dport", fmt.Sprintf("%d", svcInfo.port), + "--dport", strconv.Itoa(svcInfo.port), "-j", "REJECT", ) continue @@ -1354,7 +1382,7 @@ func (proxier *Proxier) syncProxyRules() { "-A", string(svcChain), "-m", "comment", "--comment", svcNameString, "-m", "recent", "--name", string(endpointChain), - "--rcheck", "--seconds", fmt.Sprintf("%d", svcInfo.stickyMaxAgeMinutes*60), "--reap", + "--rcheck", "--seconds", strconv.Itoa(svcInfo.stickyMaxAgeMinutes*60), "--reap", "-j", string(endpointChain)) } } @@ -1372,7 +1400,7 @@ func (proxier *Proxier) syncProxyRules() { args = append(args, "-m", "statistic", "--mode", "random", - "--probability", fmt.Sprintf("%0.5f", 1.0/float64(n-i))) + "--probability", proxier.probability(n-i)) } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) @@ -1419,7 +1447,7 @@ func (proxier *Proxier) syncProxyRules() { args = []string{ "-A", string(svcXlbChain), "-m", "comment", "--comment", - fmt.Sprintf(`"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`), + "\"Redirect pods trying to reach external loadbalancer VIP to clusterIP\"", "-s", proxier.clusterCIDR, "-j", string(svcChain), } @@ -1451,7 +1479,7 @@ func (proxier *Proxier) syncProxyRules() { args = append(args, "-m", "statistic", "--mode", "random", - "--probability", fmt.Sprintf("%0.5f", 1.0/float64(numLocalEndpoints-i))) + "--probability", proxier.probability(numLocalEndpoints-i)) } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 33f2386c467..99710f0612a 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -397,6 +397,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { portsMap: make(map[localPort]closeable), portMapper: &fakePortOpener{[]*localPort{}}, healthChecker: newFakeHealthChecker(), + precomputedProbabilities: make([]string, 0, 1001), iptablesData: bytes.NewBuffer(nil), filterChains: bytes.NewBuffer(nil), filterRules: bytes.NewBuffer(nil),