mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Precompute probabilities in iptables kube-proxy.
This commit is contained in:
parent
1242e8ca20
commit
070f393bc8
@ -354,6 +354,11 @@ type Proxier struct {
|
||||
healthChecker healthcheck.Server
|
||||
healthzServer healthcheck.HealthzUpdater
|
||||
|
||||
// Since converting probabilities (floats) to strings is expensive
|
||||
// and we are using only probabilities in the format of 1/n, we are
|
||||
// precomputing some number of those and cache for future reuse.
|
||||
precomputedProbabilities []string
|
||||
|
||||
// The following buffers are used to reuse memory and avoid allocations
|
||||
// that are significantly impacting performance.
|
||||
iptablesData *bytes.Buffer
|
||||
@ -449,27 +454,28 @@ func NewProxier(ipt utiliptables.Interface,
|
||||
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
|
||||
|
||||
proxier := &Proxier{
|
||||
portsMap: make(map[localPort]closeable),
|
||||
serviceMap: make(proxyServiceMap),
|
||||
serviceChanges: newServiceChangeMap(),
|
||||
endpointsMap: make(proxyEndpointsMap),
|
||||
endpointsChanges: newEndpointsChangeMap(hostname),
|
||||
iptables: ipt,
|
||||
masqueradeAll: masqueradeAll,
|
||||
masqueradeMark: masqueradeMark,
|
||||
exec: exec,
|
||||
clusterCIDR: clusterCIDR,
|
||||
hostname: hostname,
|
||||
nodeIP: nodeIP,
|
||||
portMapper: &listenPortOpener{},
|
||||
recorder: recorder,
|
||||
healthChecker: healthChecker,
|
||||
healthzServer: healthzServer,
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
filterChains: bytes.NewBuffer(nil),
|
||||
filterRules: bytes.NewBuffer(nil),
|
||||
natChains: bytes.NewBuffer(nil),
|
||||
natRules: bytes.NewBuffer(nil),
|
||||
portsMap: make(map[localPort]closeable),
|
||||
serviceMap: make(proxyServiceMap),
|
||||
serviceChanges: newServiceChangeMap(),
|
||||
endpointsMap: make(proxyEndpointsMap),
|
||||
endpointsChanges: newEndpointsChangeMap(hostname),
|
||||
iptables: ipt,
|
||||
masqueradeAll: masqueradeAll,
|
||||
masqueradeMark: masqueradeMark,
|
||||
exec: exec,
|
||||
clusterCIDR: clusterCIDR,
|
||||
hostname: hostname,
|
||||
nodeIP: nodeIP,
|
||||
portMapper: &listenPortOpener{},
|
||||
recorder: recorder,
|
||||
healthChecker: healthChecker,
|
||||
healthzServer: healthzServer,
|
||||
precomputedProbabilities: make([]string, 0, 1001),
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
filterChains: bytes.NewBuffer(nil),
|
||||
filterRules: bytes.NewBuffer(nil),
|
||||
natChains: bytes.NewBuffer(nil),
|
||||
natRules: bytes.NewBuffer(nil),
|
||||
}
|
||||
burstSyncs := 2
|
||||
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
|
||||
@ -565,6 +571,28 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
|
||||
return encounteredError
|
||||
}
|
||||
|
||||
func computeProbability(n int) string {
|
||||
return fmt.Sprintf("%0.5f", 1.0/float64(n))
|
||||
}
|
||||
|
||||
// This assumes proxier.mu is held
|
||||
func (proxier *Proxier) precomputeProbabilities(numberOfPrecomputed int) {
|
||||
if len(proxier.precomputedProbabilities) == 0 {
|
||||
proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, "<bad value>")
|
||||
}
|
||||
for i := len(proxier.precomputedProbabilities); i <= numberOfPrecomputed; i++ {
|
||||
proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, computeProbability(i))
|
||||
}
|
||||
}
|
||||
|
||||
// This assumes proxier.mu is held
|
||||
func (proxier *Proxier) probability(n int) string {
|
||||
if n >= len(proxier.precomputedProbabilities) {
|
||||
proxier.precomputeProbabilities(n)
|
||||
}
|
||||
return proxier.precomputedProbabilities[n]
|
||||
}
|
||||
|
||||
// Sync is called to synchronize the proxier state to iptables as soon as possible.
|
||||
func (proxier *Proxier) Sync() {
|
||||
proxier.syncRunner.Run()
|
||||
@ -1104,7 +1132,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
|
||||
"--dport", fmt.Sprintf("%d", svcInfo.port),
|
||||
"--dport", strconv.Itoa(svcInfo.port),
|
||||
}
|
||||
if proxier.masqueradeAll {
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
|
||||
@ -1154,7 +1182,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-d", fmt.Sprintf("%s/32", externalIP),
|
||||
"--dport", fmt.Sprintf("%d", svcInfo.port),
|
||||
"--dport", strconv.Itoa(svcInfo.port),
|
||||
}
|
||||
// We have to SNAT packets to external IPs.
|
||||
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
|
||||
@ -1180,7 +1208,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-d", fmt.Sprintf("%s/32", externalIP),
|
||||
"--dport", fmt.Sprintf("%d", svcInfo.port),
|
||||
"--dport", strconv.Itoa(svcInfo.port),
|
||||
"-j", "REJECT",
|
||||
)
|
||||
}
|
||||
@ -1206,7 +1234,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-d", fmt.Sprintf("%s/32", ingress.IP),
|
||||
"--dport", fmt.Sprintf("%d", svcInfo.port),
|
||||
"--dport", strconv.Itoa(svcInfo.port),
|
||||
}
|
||||
// jump to service firewall chain
|
||||
writeLine(proxier.natRules, append(args, "-j", string(fwChain))...)
|
||||
@ -1284,7 +1312,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-A", string(kubeNodePortsChain),
|
||||
"-m", "comment", "--comment", svcNameString,
|
||||
"-m", protocol, "-p", protocol,
|
||||
"--dport", fmt.Sprintf("%d", svcInfo.nodePort),
|
||||
"--dport", strconv.Itoa(svcInfo.nodePort),
|
||||
}
|
||||
if !svcInfo.onlyNodeLocalEndpoints {
|
||||
// Nodeports need SNAT, unless they're local.
|
||||
@ -1307,7 +1335,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
|
||||
"-m", "addrtype", "--dst-type", "LOCAL",
|
||||
"-m", protocol, "-p", protocol,
|
||||
"--dport", fmt.Sprintf("%d", svcInfo.nodePort),
|
||||
"--dport", strconv.Itoa(svcInfo.nodePort),
|
||||
"-j", "REJECT",
|
||||
)
|
||||
}
|
||||
@ -1320,7 +1348,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
|
||||
"-m", protocol, "-p", protocol,
|
||||
"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
|
||||
"--dport", fmt.Sprintf("%d", svcInfo.port),
|
||||
"--dport", strconv.Itoa(svcInfo.port),
|
||||
"-j", "REJECT",
|
||||
)
|
||||
continue
|
||||
@ -1354,7 +1382,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
"-A", string(svcChain),
|
||||
"-m", "comment", "--comment", svcNameString,
|
||||
"-m", "recent", "--name", string(endpointChain),
|
||||
"--rcheck", "--seconds", fmt.Sprintf("%d", svcInfo.stickyMaxAgeMinutes*60), "--reap",
|
||||
"--rcheck", "--seconds", strconv.Itoa(svcInfo.stickyMaxAgeMinutes*60), "--reap",
|
||||
"-j", string(endpointChain))
|
||||
}
|
||||
}
|
||||
@ -1372,7 +1400,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
args = append(args,
|
||||
"-m", "statistic",
|
||||
"--mode", "random",
|
||||
"--probability", fmt.Sprintf("%0.5f", 1.0/float64(n-i)))
|
||||
"--probability", proxier.probability(n-i))
|
||||
}
|
||||
// The final (or only if n == 1) rule is a guaranteed match.
|
||||
args = append(args, "-j", string(endpointChain))
|
||||
@ -1419,7 +1447,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
args = []string{
|
||||
"-A", string(svcXlbChain),
|
||||
"-m", "comment", "--comment",
|
||||
fmt.Sprintf(`"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`),
|
||||
"\"Redirect pods trying to reach external loadbalancer VIP to clusterIP\"",
|
||||
"-s", proxier.clusterCIDR,
|
||||
"-j", string(svcChain),
|
||||
}
|
||||
@ -1451,7 +1479,7 @@ func (proxier *Proxier) syncProxyRules() {
|
||||
args = append(args,
|
||||
"-m", "statistic",
|
||||
"--mode", "random",
|
||||
"--probability", fmt.Sprintf("%0.5f", 1.0/float64(numLocalEndpoints-i)))
|
||||
"--probability", proxier.probability(numLocalEndpoints-i))
|
||||
}
|
||||
// The final (or only if n == 1) rule is a guaranteed match.
|
||||
args = append(args, "-j", string(endpointChain))
|
||||
|
@ -397,6 +397,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
|
||||
portsMap: make(map[localPort]closeable),
|
||||
portMapper: &fakePortOpener{[]*localPort{}},
|
||||
healthChecker: newFakeHealthChecker(),
|
||||
precomputedProbabilities: make([]string, 0, 1001),
|
||||
iptablesData: bytes.NewBuffer(nil),
|
||||
filterChains: bytes.NewBuffer(nil),
|
||||
filterRules: bytes.NewBuffer(nil),
|
||||
|
Loading…
Reference in New Issue
Block a user