Merge pull request #46350 from wojtek-t/reduce_kube_proxy_allocations_2

Automatic merge from submit-queue (batch tested with PRs 45534, 37212, 46613, 46350)

Speed up and reduce number of memory allocations in kube-proxy

This is a second (and last PR) in this series - this solves all very-low-hanging fruits.

This PR:
- reduces cpu usage by ~25%
- reduces memory allocations by ~3x (together with #46033 by 10-12x)

Without this PR:
```
(pprof) top
8.59GB of 8.79GB total (97.75%)
Dropped 238 nodes (cum <= 0.04GB)
Showing top 10 nodes out of 64 (cum >= 0.11GB)
      flat  flat%   sum%        cum   cum%
    3.66GB 41.60% 41.60%     8.72GB 99.17%  k8s.io/kubernetes/pkg/proxy/iptables.(*Proxier).syncProxyRules
    3.07GB 34.96% 76.56%     3.07GB 34.96%  runtime.rawstringtmp
    0.62GB  7.09% 83.65%     0.62GB  7.09%  runtime.hashGrow
    0.34GB  3.82% 87.46%     0.34GB  3.82%  runtime.stringtoslicebyte
    0.29GB  3.24% 90.71%     0.58GB  6.61%  encoding/base32.(*Encoding).EncodeToString
    0.22GB  2.47% 93.18%     0.22GB  2.47%  strings.genSplit
    0.18GB  2.04% 95.22%     0.18GB  2.04%  runtime.convT2E
    0.11GB  1.22% 96.44%     0.73GB  8.36%  runtime.mapassign
    0.10GB  1.08% 97.52%     0.10GB  1.08%  syscall.ByteSliceFromString
    0.02GB  0.23% 97.75%     0.11GB  1.25%  syscall.SlicePtrFromStrings
```

with this PR:
```
(pprof) top
2.98GB of 3.08GB total (96.78%)
Dropped 246 nodes (cum <= 0.02GB)
Showing top 10 nodes out of 70 (cum >= 0.10GB)
      flat  flat%   sum%        cum   cum%
    1.99GB 64.60% 64.60%     1.99GB 64.60%  runtime.rawstringtmp
    0.58GB 18.95% 83.55%     0.58GB 18.95%  runtime.hashGrow
    0.10GB  3.40% 86.95%     0.69GB 22.47%  runtime.mapassign
    0.09GB  2.86% 89.80%     0.09GB  2.86%  syscall.ByteSliceFromString
    0.08GB  2.63% 92.44%     0.08GB  2.63%  runtime.convT2E
    0.03GB  1.13% 93.56%     0.03GB  1.13%  syscall.Environ
    0.03GB  0.99% 94.56%     0.03GB  0.99%  bytes.makeSlice
    0.03GB  0.97% 95.52%     0.03GB  1.06%  os.Stat
    0.02GB  0.65% 96.18%     3.01GB 97.79%  k8s.io/kubernetes/pkg/proxy/iptables.(*Proxier).syncProxyRules
    0.02GB   0.6% 96.78%     0.10GB  3.35%  syscall.SlicePtrFromStrings
```
This commit is contained in:
Kubernetes Submit Queue 2017-05-30 04:46:06 -07:00 committed by GitHub
commit 0f95f13dcc
3 changed files with 263 additions and 178 deletions

View File

@ -148,12 +148,39 @@ type serviceInfo struct {
loadBalancerSourceRanges []string loadBalancerSourceRanges []string
onlyNodeLocalEndpoints bool onlyNodeLocalEndpoints bool
healthCheckNodePort int healthCheckNodePort int
// The following fields are computed and stored for performance reasons.
serviceNameString string
servicePortChainName utiliptables.Chain
serviceFirewallChainName utiliptables.Chain
serviceLBChainName utiliptables.Chain
} }
// internal struct for endpoints information // internal struct for endpoints information
type endpointsInfo struct { type endpointsInfo struct {
endpoint string // TODO: should be an endpointString type endpoint string // TODO: should be an endpointString type
isLocal bool isLocal bool
// The following fields we lazily compute and store here for performance
// reasons. If the protocol is the same as you expect it to be, then the
// chainName can be reused, otherwise it should be recomputed.
protocol string
chainName utiliptables.Chain
}
// Returns just the IP part of the endpoint.
func (e *endpointsInfo) IPPart() string {
if index := strings.Index(e.endpoint, ":"); index != -1 {
return e.endpoint[0:index]
}
return e.endpoint
}
// Returns the endpoint chain name for a given endpointsInfo.
func (e *endpointsInfo) endpointChain(svcNameString, protocol string) utiliptables.Chain {
if e.protocol != protocol {
e.protocol = protocol
e.chainName = servicePortEndpointChainName(svcNameString, protocol, e.endpoint)
}
return e.chainName
} }
func (e *endpointsInfo) String() string { func (e *endpointsInfo) String() string {
@ -192,6 +219,13 @@ func newServiceInfo(svcPortName proxy.ServicePortName, port *api.ServicePort, se
} }
} }
// Store the following for performance reasons.
protocol := strings.ToLower(string(info.protocol))
info.serviceNameString = svcPortName.String()
info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol)
info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol)
info.serviceLBChainName = serviceLBChainName(info.serviceNameString, protocol)
return info return info
} }
@ -346,6 +380,11 @@ type Proxier struct {
healthChecker healthcheck.Server healthChecker healthcheck.Server
healthzServer healthcheck.HealthzUpdater healthzServer healthcheck.HealthzUpdater
// Since converting probabilities (floats) to strings is expensive
// and we are using only probabilities in the format of 1/n, we are
// precomputing some number of those and cache for future reuse.
precomputedProbabilities []string
// The following buffers are used to reuse memory and avoid allocations // The following buffers are used to reuse memory and avoid allocations
// that are significantly impacting performance. // that are significantly impacting performance.
iptablesData *bytes.Buffer iptablesData *bytes.Buffer
@ -441,27 +480,28 @@ func NewProxier(ipt utiliptables.Interface,
healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps
proxier := &Proxier{ proxier := &Proxier{
portsMap: make(map[localPort]closeable), portsMap: make(map[localPort]closeable),
serviceMap: make(proxyServiceMap), serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(), serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap), endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(hostname), endpointsChanges: newEndpointsChangeMap(hostname),
iptables: ipt, iptables: ipt,
masqueradeAll: masqueradeAll, masqueradeAll: masqueradeAll,
masqueradeMark: masqueradeMark, masqueradeMark: masqueradeMark,
exec: exec, exec: exec,
clusterCIDR: clusterCIDR, clusterCIDR: clusterCIDR,
hostname: hostname, hostname: hostname,
nodeIP: nodeIP, nodeIP: nodeIP,
portMapper: &listenPortOpener{}, portMapper: &listenPortOpener{},
recorder: recorder, recorder: recorder,
healthChecker: healthChecker, healthChecker: healthChecker,
healthzServer: healthzServer, healthzServer: healthzServer,
iptablesData: bytes.NewBuffer(nil), precomputedProbabilities: make([]string, 0, 1001),
filterChains: bytes.NewBuffer(nil), iptablesData: bytes.NewBuffer(nil),
filterRules: bytes.NewBuffer(nil), filterChains: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil), filterRules: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
} }
burstSyncs := 2 burstSyncs := 2
glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
@ -557,6 +597,28 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
return encounteredError return encounteredError
} }
func computeProbability(n int) string {
return fmt.Sprintf("%0.5f", 1.0/float64(n))
}
// This assumes proxier.mu is held
func (proxier *Proxier) precomputeProbabilities(numberOfPrecomputed int) {
if len(proxier.precomputedProbabilities) == 0 {
proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, "<bad value>")
}
for i := len(proxier.precomputedProbabilities); i <= numberOfPrecomputed; i++ {
proxier.precomputedProbabilities = append(proxier.precomputedProbabilities, computeProbability(i))
}
}
// This assumes proxier.mu is held
func (proxier *Proxier) probability(n int) string {
if n >= len(proxier.precomputedProbabilities) {
proxier.precomputeProbabilities(n)
}
return proxier.precomputedProbabilities[n]
}
// Sync is called to synchronize the proxier state to iptables as soon as possible. // Sync is called to synchronize the proxier state to iptables as soon as possible.
func (proxier *Proxier) Sync() { func (proxier *Proxier) Sync() {
proxier.syncRunner.Run() proxier.syncRunner.Run()
@ -751,8 +813,7 @@ func getLocalIPs(endpointsMap proxyEndpointsMap) map[types.NamespacedName]sets.S
if localIPs[nsn] == nil { if localIPs[nsn] == nil {
localIPs[nsn] = sets.NewString() localIPs[nsn] = sets.NewString()
} }
ip := strings.Split(ep.endpoint, ":")[0] // just the IP part localIPs[nsn].Insert(ep.IPPart()) // just the IP part
localIPs[nsn].Insert(ip)
} }
} }
} }
@ -873,6 +934,13 @@ type endpointServicePair struct {
servicePortName proxy.ServicePortName servicePortName proxy.ServicePortName
} }
func (esp *endpointServicePair) IPPart() string {
if index := strings.Index(esp.endpoint, ":"); index != -1 {
return esp.endpoint[0:index]
}
return esp.endpoint
}
const noConnectionToDelete = "0 flow entries have been deleted" const noConnectionToDelete = "0 flow entries have been deleted"
// After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we // After a UDP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
@ -881,7 +949,7 @@ const noConnectionToDelete = "0 flow entries have been deleted"
func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) { func (proxier *Proxier) deleteEndpointConnections(connectionMap map[endpointServicePair]bool) {
for epSvcPair := range connectionMap { for epSvcPair := range connectionMap {
if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP { if svcInfo, ok := proxier.serviceMap[epSvcPair.servicePortName]; ok && svcInfo.protocol == api.ProtocolUDP {
endpointIP := strings.Split(epSvcPair.endpoint, ":")[0] endpointIP := epSvcPair.endpoint[0:strings.Index(epSvcPair.endpoint, ":")]
glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP) glog.V(2).Infof("Deleting connection tracking state for service IP %s, endpoint IP %s", svcInfo.clusterIP.String(), endpointIP)
err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp") err := utilproxy.ExecConntrackTool(proxier.exec, "-D", "--orig-dst", svcInfo.clusterIP.String(), "--dst-nat", endpointIP, "-p", "udp")
if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) { if err != nil && !strings.Contains(err.Error(), noConnectionToDelete) {
@ -1053,15 +1121,28 @@ func (proxier *Proxier) syncProxyRules() {
// Accumulate the set of local ports that we will be holding open once this update is complete // Accumulate the set of local ports that we will be holding open once this update is complete
replacementPortsMap := map[localPort]closeable{} replacementPortsMap := map[localPort]closeable{}
// We are creating those slices ones here to avoid memory reallocations
// in every loop. Note that reuse the memory, instead of doing:
// slice = <some new slice>
// you should always do one of the below:
// slice = slice[:0] // and then append to it
// slice = append(slice[:0], ...)
endpoints := make([]*endpointsInfo, 0)
endpointChains := make([]utiliptables.Chain, 0)
// To avoid growing this slice, we arbitrarily set its size to 64,
// there is never more than that many arguments for a single line.
// Note that even if we go over 64, it will still be correct - it
// is just for efficiency, not correctness.
args := make([]string, 64)
// Build rules for each service. // Build rules for each service.
var svcNameString string
for svcName, svcInfo := range proxier.serviceMap { for svcName, svcInfo := range proxier.serviceMap {
protocol := strings.ToLower(string(svcInfo.protocol)) protocol := strings.ToLower(string(svcInfo.protocol))
// Precompute svcNameString; with many services the many calls svcNameString = svcInfo.serviceNameString
// to ServicePortName.String() show up in CPU profiles.
svcNameString := svcName.String()
// Create the per-service chain, retaining counters if possible. // Create the per-service chain, retaining counters if possible.
svcChain := servicePortChainName(svcNameString, protocol) svcChain := svcInfo.servicePortChainName
if chain, ok := existingNATChains[svcChain]; ok { if chain, ok := existingNATChains[svcChain]; ok {
writeLine(proxier.natChains, chain) writeLine(proxier.natChains, chain)
} else { } else {
@ -1069,7 +1150,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
activeNATChains[svcChain] = true activeNATChains[svcChain] = true
svcXlbChain := serviceLBChainName(svcNameString, protocol) svcXlbChain := svcInfo.serviceLBChainName
if svcInfo.onlyNodeLocalEndpoints { if svcInfo.onlyNodeLocalEndpoints {
// Only for services request OnlyLocal traffic // Only for services request OnlyLocal traffic
// create the per-service LB chain, retaining counters if possible. // create the per-service LB chain, retaining counters if possible.
@ -1085,13 +1166,13 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Capture the clusterIP. // Capture the clusterIP.
args := []string{ args = append(args[:0],
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
"--dport", fmt.Sprintf("%d", svcInfo.port), "--dport", strconv.Itoa(svcInfo.port),
} )
if proxier.masqueradeAll { if proxier.masqueradeAll {
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
} }
@ -1135,13 +1216,13 @@ func (proxier *Proxier) syncProxyRules() {
replacementPortsMap[lp] = socket replacementPortsMap[lp] = socket
} }
} // We're holding the port, so it's OK to install iptables rules. } // We're holding the port, so it's OK to install iptables rules.
args := []string{ args = append(args[:0],
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", externalIP), "-d", fmt.Sprintf("%s/32", externalIP),
"--dport", fmt.Sprintf("%d", svcInfo.port), "--dport", strconv.Itoa(svcInfo.port),
} )
// We have to SNAT packets to external IPs. // We have to SNAT packets to external IPs.
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
@ -1166,17 +1247,17 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", externalIP), "-d", fmt.Sprintf("%s/32", externalIP),
"--dport", fmt.Sprintf("%d", svcInfo.port), "--dport", strconv.Itoa(svcInfo.port),
"-j", "REJECT", "-j", "REJECT",
) )
} }
} }
// Capture load-balancer ingress. // Capture load-balancer ingress.
fwChain := svcInfo.serviceFirewallChainName
for _, ingress := range svcInfo.loadBalancerStatus.Ingress { for _, ingress := range svcInfo.loadBalancerStatus.Ingress {
if ingress.IP != "" { if ingress.IP != "" {
// create service firewall chain // create service firewall chain
fwChain := serviceFirewallChainName(svcNameString, protocol)
if chain, ok := existingNATChains[fwChain]; ok { if chain, ok := existingNATChains[fwChain]; ok {
writeLine(proxier.natChains, chain) writeLine(proxier.natChains, chain)
} else { } else {
@ -1187,20 +1268,20 @@ func (proxier *Proxier) syncProxyRules() {
// This currently works for loadbalancers that preserves source ips. // This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
args := []string{ args = append(args[:0],
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", ingress.IP), "-d", fmt.Sprintf("%s/32", ingress.IP),
"--dport", fmt.Sprintf("%d", svcInfo.port), "--dport", strconv.Itoa(svcInfo.port),
} )
// jump to service firewall chain // jump to service firewall chain
writeLine(proxier.natRules, append(args, "-j", string(fwChain))...) writeLine(proxier.natRules, append(args, "-j", string(fwChain))...)
args = []string{ args = append(args[:0],
"-A", string(fwChain), "-A", string(fwChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
} )
// Each source match rule in the FW chain may jump to either the SVC or the XLB chain // Each source match rule in the FW chain may jump to either the SVC or the XLB chain
chosenChain := svcXlbChain chosenChain := svcXlbChain
@ -1266,12 +1347,12 @@ func (proxier *Proxier) syncProxyRules() {
replacementPortsMap[lp] = socket replacementPortsMap[lp] = socket
} // We're holding the port, so it's OK to install iptables rules. } // We're holding the port, so it's OK to install iptables rules.
args := []string{ args = append(args[:0],
"-A", string(kubeNodePortsChain), "-A", string(kubeNodePortsChain),
"-m", "comment", "--comment", svcNameString, "-m", "comment", "--comment", svcNameString,
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", svcInfo.nodePort), "--dport", strconv.Itoa(svcInfo.nodePort),
} )
if !svcInfo.onlyNodeLocalEndpoints { if !svcInfo.onlyNodeLocalEndpoints {
// Nodeports need SNAT, unless they're local. // Nodeports need SNAT, unless they're local.
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...)
@ -1293,7 +1374,7 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", "addrtype", "--dst-type", "LOCAL", "-m", "addrtype", "--dst-type", "LOCAL",
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"--dport", fmt.Sprintf("%d", svcInfo.nodePort), "--dport", strconv.Itoa(svcInfo.nodePort),
"-j", "REJECT", "-j", "REJECT",
) )
} }
@ -1306,7 +1387,7 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol, "-m", protocol, "-p", protocol,
"-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()),
"--dport", fmt.Sprintf("%d", svcInfo.port), "--dport", strconv.Itoa(svcInfo.port),
"-j", "REJECT", "-j", "REJECT",
) )
continue continue
@ -1317,11 +1398,12 @@ func (proxier *Proxier) syncProxyRules() {
// Generate the per-endpoint chains. We do this in multiple passes so we // Generate the per-endpoint chains. We do this in multiple passes so we
// can group rules together. // can group rules together.
// These two slices parallel each other - keep in sync // These two slices parallel each other - keep in sync
endpoints := make([]*endpointsInfo, 0) endpoints = endpoints[:0]
endpointChains := make([]utiliptables.Chain, 0) endpointChains = endpointChains[:0]
var endpointChain utiliptables.Chain
for _, ep := range proxier.endpointsMap[svcName] { for _, ep := range proxier.endpointsMap[svcName] {
endpoints = append(endpoints, ep) endpoints = append(endpoints, ep)
endpointChain := servicePortEndpointChainName(svcNameString, protocol, ep.endpoint) endpointChain = ep.endpointChain(svcNameString, protocol)
endpointChains = append(endpointChains, endpointChain) endpointChains = append(endpointChains, endpointChain)
// Create the endpoint chain, retaining counters if possible. // Create the endpoint chain, retaining counters if possible.
@ -1340,7 +1422,7 @@ func (proxier *Proxier) syncProxyRules() {
"-A", string(svcChain), "-A", string(svcChain),
"-m", "comment", "--comment", svcNameString, "-m", "comment", "--comment", svcNameString,
"-m", "recent", "--name", string(endpointChain), "-m", "recent", "--name", string(endpointChain),
"--rcheck", "--seconds", fmt.Sprintf("%d", svcInfo.stickyMaxAgeMinutes*60), "--reap", "--rcheck", "--seconds", strconv.Itoa(svcInfo.stickyMaxAgeMinutes*60), "--reap",
"-j", string(endpointChain)) "-j", string(endpointChain))
} }
} }
@ -1349,29 +1431,29 @@ func (proxier *Proxier) syncProxyRules() {
n := len(endpointChains) n := len(endpointChains)
for i, endpointChain := range endpointChains { for i, endpointChain := range endpointChains {
// Balancing rules in the per-service chain. // Balancing rules in the per-service chain.
args := []string{ args = append(args[:0], []string{
"-A", string(svcChain), "-A", string(svcChain),
"-m", "comment", "--comment", svcNameString, "-m", "comment", "--comment", svcNameString,
} }...)
if i < (n - 1) { if i < (n - 1) {
// Each rule is a probabilistic match. // Each rule is a probabilistic match.
args = append(args, args = append(args,
"-m", "statistic", "-m", "statistic",
"--mode", "random", "--mode", "random",
"--probability", fmt.Sprintf("%0.5f", 1.0/float64(n-i))) "--probability", proxier.probability(n-i))
} }
// The final (or only if n == 1) rule is a guaranteed match. // The final (or only if n == 1) rule is a guaranteed match.
args = append(args, "-j", string(endpointChain)) args = append(args, "-j", string(endpointChain))
writeLine(proxier.natRules, args...) writeLine(proxier.natRules, args...)
// Rules in the per-endpoint chain. // Rules in the per-endpoint chain.
args = []string{ args = append(args[:0],
"-A", string(endpointChain), "-A", string(endpointChain),
"-m", "comment", "--comment", svcNameString, "-m", "comment", "--comment", svcNameString,
} )
// Handle traffic that loops back to the originator with SNAT. // Handle traffic that loops back to the originator with SNAT.
writeLine(proxier.natRules, append(args, writeLine(proxier.natRules, append(args,
"-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].endpoint, ":")[0]), "-s", fmt.Sprintf("%s/32", endpoints[i].IPPart()),
"-j", string(KubeMarkMasqChain))...) "-j", string(KubeMarkMasqChain))...)
// Update client-affinity lists. // Update client-affinity lists.
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
@ -1402,42 +1484,42 @@ func (proxier *Proxier) syncProxyRules() {
// Service's ClusterIP instead. This happens whether or not we have local // Service's ClusterIP instead. This happens whether or not we have local
// endpoints; only if clusterCIDR is specified // endpoints; only if clusterCIDR is specified
if len(proxier.clusterCIDR) > 0 { if len(proxier.clusterCIDR) > 0 {
args = []string{ args = append(args[:0],
"-A", string(svcXlbChain), "-A", string(svcXlbChain),
"-m", "comment", "--comment", "-m", "comment", "--comment",
fmt.Sprintf(`"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`), `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`,
"-s", proxier.clusterCIDR, "-s", proxier.clusterCIDR,
"-j", string(svcChain), "-j", string(svcChain),
} )
writeLine(proxier.natRules, args...) writeLine(proxier.natRules, args...)
} }
numLocalEndpoints := len(localEndpointChains) numLocalEndpoints := len(localEndpointChains)
if numLocalEndpoints == 0 { if numLocalEndpoints == 0 {
// Blackhole all traffic since there are no local endpoints // Blackhole all traffic since there are no local endpoints
args := []string{ args = append(args[:0],
"-A", string(svcXlbChain), "-A", string(svcXlbChain),
"-m", "comment", "--comment", "-m", "comment", "--comment",
fmt.Sprintf(`"%s has no local endpoints"`, svcNameString), fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
"-j", "-j",
string(KubeMarkDropChain), string(KubeMarkDropChain),
} )
writeLine(proxier.natRules, args...) writeLine(proxier.natRules, args...)
} else { } else {
// Setup probability filter rules only over local endpoints // Setup probability filter rules only over local endpoints
for i, endpointChain := range localEndpointChains { for i, endpointChain := range localEndpointChains {
// Balancing rules in the per-service chain. // Balancing rules in the per-service chain.
args := []string{ args = append(args[:0],
"-A", string(svcXlbChain), "-A", string(svcXlbChain),
"-m", "comment", "--comment", "-m", "comment", "--comment",
fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcNameString), fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcNameString),
} )
if i < (numLocalEndpoints - 1) { if i < (numLocalEndpoints - 1) {
// Each rule is a probabilistic match. // Each rule is a probabilistic match.
args = append(args, args = append(args,
"-m", "statistic", "-m", "statistic",
"--mode", "random", "--mode", "random",
"--probability", fmt.Sprintf("%0.5f", 1.0/float64(numLocalEndpoints-i))) "--probability", proxier.probability(numLocalEndpoints-i))
} }
// The final (or only if n == 1) rule is a guaranteed match. // The final (or only if n == 1) rule is a guaranteed match.
args = append(args, "-j", string(endpointChain)) args = append(args, "-j", string(endpointChain))

View File

@ -386,22 +386,23 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier {
// TODO: Call NewProxier after refactoring out the goroutine // TODO: Call NewProxier after refactoring out the goroutine
// invocation into a Run() method. // invocation into a Run() method.
p := &Proxier{ p := &Proxier{
exec: &exec.FakeExec{}, exec: &exec.FakeExec{},
serviceMap: make(proxyServiceMap), serviceMap: make(proxyServiceMap),
serviceChanges: newServiceChangeMap(), serviceChanges: newServiceChangeMap(),
endpointsMap: make(proxyEndpointsMap), endpointsMap: make(proxyEndpointsMap),
endpointsChanges: newEndpointsChangeMap(testHostname), endpointsChanges: newEndpointsChangeMap(testHostname),
iptables: ipt, iptables: ipt,
clusterCIDR: "10.0.0.0/24", clusterCIDR: "10.0.0.0/24",
hostname: testHostname, hostname: testHostname,
portsMap: make(map[localPort]closeable), portsMap: make(map[localPort]closeable),
portMapper: &fakePortOpener{[]*localPort{}}, portMapper: &fakePortOpener{[]*localPort{}},
healthChecker: newFakeHealthChecker(), healthChecker: newFakeHealthChecker(),
iptablesData: bytes.NewBuffer(nil), precomputedProbabilities: make([]string, 0, 1001),
filterChains: bytes.NewBuffer(nil), iptablesData: bytes.NewBuffer(nil),
filterRules: bytes.NewBuffer(nil), filterChains: bytes.NewBuffer(nil),
natChains: bytes.NewBuffer(nil), filterRules: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil), natChains: bytes.NewBuffer(nil),
natRules: bytes.NewBuffer(nil),
} }
p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1) p.syncRunner = async.NewBoundedFrequencyRunner("test-sync-runner", p.syncProxyRules, 0, time.Minute, 1)
return p return p
@ -1292,7 +1293,7 @@ func Test_getLocalIPs(t *testing.T) {
// Case[1]: unnamed port // Case[1]: unnamed port
endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): { makeServicePortName("ns1", "ep1", ""): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
expected: map[types.NamespacedName]sets.String{}, expected: map[types.NamespacedName]sets.String{},
@ -1300,7 +1301,7 @@ func Test_getLocalIPs(t *testing.T) {
// Case[2]: unnamed port local // Case[2]: unnamed port local
endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): { makeServicePortName("ns1", "ep1", ""): {
{"1.1.1.1:11", true}, {endpoint: "1.1.1.1:11", isLocal: true},
}, },
}, },
expected: map[types.NamespacedName]sets.String{ expected: map[types.NamespacedName]sets.String{
@ -1310,12 +1311,12 @@ func Test_getLocalIPs(t *testing.T) {
// Case[3]: named local and non-local ports for the same IP. // Case[3]: named local and non-local ports for the same IP.
endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
{"1.1.1.2:11", true}, {endpoint: "1.1.1.2:11", isLocal: true},
}, },
makeServicePortName("ns1", "ep1", "p12"): { makeServicePortName("ns1", "ep1", "p12"): {
{"1.1.1.1:12", false}, {endpoint: "1.1.1.1:12", isLocal: false},
{"1.1.1.2:12", true}, {endpoint: "1.1.1.2:12", isLocal: true},
}, },
}, },
expected: map[types.NamespacedName]sets.String{ expected: map[types.NamespacedName]sets.String{
@ -1325,21 +1326,21 @@ func Test_getLocalIPs(t *testing.T) {
// Case[4]: named local and non-local ports for different IPs. // Case[4]: named local and non-local ports for different IPs.
endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{ endpointsMap: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
makeServicePortName("ns2", "ep2", "p22"): { makeServicePortName("ns2", "ep2", "p22"): {
{"2.2.2.2:22", true}, {endpoint: "2.2.2.2:22", isLocal: true},
{"2.2.2.22:22", true}, {endpoint: "2.2.2.22:22", isLocal: true},
}, },
makeServicePortName("ns2", "ep2", "p23"): { makeServicePortName("ns2", "ep2", "p23"): {
{"2.2.2.3:23", true}, {endpoint: "2.2.2.3:23", isLocal: true},
}, },
makeServicePortName("ns4", "ep4", "p44"): { makeServicePortName("ns4", "ep4", "p44"): {
{"4.4.4.4:44", true}, {endpoint: "4.4.4.4:44", isLocal: true},
{"4.4.4.5:44", false}, {endpoint: "4.4.4.5:44", isLocal: false},
}, },
makeServicePortName("ns4", "ep4", "p45"): { makeServicePortName("ns4", "ep4", "p45"): {
{"4.4.4.6:45", true}, {endpoint: "4.4.4.6:45", isLocal: true},
}, },
}, },
expected: map[types.NamespacedName]sets.String{ expected: map[types.NamespacedName]sets.String{
@ -1384,7 +1385,7 @@ func Test_endpointsToEndpointsMap(t *testing.T) {
}), }),
expected: map[proxy.ServicePortName][]*endpointsInfo{ expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): { makeServicePortName("ns1", "ep1", ""): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
}, { }, {
@ -1404,7 +1405,7 @@ func Test_endpointsToEndpointsMap(t *testing.T) {
}), }),
expected: map[proxy.ServicePortName][]*endpointsInfo{ expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "port"): { makeServicePortName("ns1", "ep1", "port"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
}, { }, {
@ -1423,7 +1424,7 @@ func Test_endpointsToEndpointsMap(t *testing.T) {
}), }),
expected: map[proxy.ServicePortName][]*endpointsInfo{ expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): { makeServicePortName("ns1", "ep1", ""): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
}, { }, {
@ -1452,12 +1453,12 @@ func Test_endpointsToEndpointsMap(t *testing.T) {
}), }),
expected: map[proxy.ServicePortName][]*endpointsInfo{ expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p1"): { makeServicePortName("ns1", "ep1", "p1"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
{"2.2.2.2:11", false}, {endpoint: "2.2.2.2:11", isLocal: false},
}, },
makeServicePortName("ns1", "ep1", "p2"): { makeServicePortName("ns1", "ep1", "p2"): {
{"1.1.1.1:22", false}, {endpoint: "1.1.1.1:22", isLocal: false},
{"2.2.2.2:22", false}, {endpoint: "2.2.2.2:22", isLocal: false},
}, },
}, },
}, { }, {
@ -1477,7 +1478,7 @@ func Test_endpointsToEndpointsMap(t *testing.T) {
}), }),
expected: map[proxy.ServicePortName][]*endpointsInfo{ expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p1"): { makeServicePortName("ns1", "ep1", "p1"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
}, { }, {
@ -1497,7 +1498,7 @@ func Test_endpointsToEndpointsMap(t *testing.T) {
}), }),
expected: map[proxy.ServicePortName][]*endpointsInfo{ expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p2"): { makeServicePortName("ns1", "ep1", "p2"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
}, { }, {
@ -1517,7 +1518,7 @@ func Test_endpointsToEndpointsMap(t *testing.T) {
}), }),
expected: map[proxy.ServicePortName][]*endpointsInfo{ expected: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p1"): { makeServicePortName("ns1", "ep1", "p1"): {
{"1.1.1.1:22", false}, {endpoint: "1.1.1.1:22", isLocal: false},
}, },
}, },
}} }}
@ -1931,12 +1932,12 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): { makeServicePortName("ns1", "ep1", ""): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): { makeServicePortName("ns1", "ep1", ""): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
@ -1951,12 +1952,12 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", true}, {endpoint: "1.1.1.1:11", isLocal: true},
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", true}, {endpoint: "1.1.1.1:11", isLocal: true},
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
@ -1973,18 +1974,18 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
makeServicePortName("ns1", "ep1", "p12"): { makeServicePortName("ns1", "ep1", "p12"): {
{"1.1.1.2:12", false}, {endpoint: "1.1.1.2:12", isLocal: false},
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
makeServicePortName("ns1", "ep1", "p12"): { makeServicePortName("ns1", "ep1", "p12"): {
{"1.1.1.2:12", false}, {endpoint: "1.1.1.2:12", isLocal: false},
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
@ -1999,24 +2000,24 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", true}, {endpoint: "1.1.1.1:11", isLocal: true},
}, },
makeServicePortName("ns1", "ep1", "p12"): { makeServicePortName("ns1", "ep1", "p12"): {
{"1.1.1.1:12", true}, {endpoint: "1.1.1.1:12", isLocal: true},
}, },
makeServicePortName("ns1", "ep1", "p13"): { makeServicePortName("ns1", "ep1", "p13"): {
{"1.1.1.3:13", false}, {endpoint: "1.1.1.3:13", isLocal: false},
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", true}, {endpoint: "1.1.1.1:11", isLocal: true},
}, },
makeServicePortName("ns1", "ep1", "p12"): { makeServicePortName("ns1", "ep1", "p12"): {
{"1.1.1.1:12", true}, {endpoint: "1.1.1.1:12", isLocal: true},
}, },
makeServicePortName("ns1", "ep1", "p13"): { makeServicePortName("ns1", "ep1", "p13"): {
{"1.1.1.3:13", false}, {endpoint: "1.1.1.3:13", isLocal: false},
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
@ -2035,54 +2036,54 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
{"1.1.1.2:11", true}, {endpoint: "1.1.1.2:11", isLocal: true},
}, },
makeServicePortName("ns1", "ep1", "p12"): { makeServicePortName("ns1", "ep1", "p12"): {
{"1.1.1.1:12", false}, {endpoint: "1.1.1.1:12", isLocal: false},
{"1.1.1.2:12", true}, {endpoint: "1.1.1.2:12", isLocal: true},
}, },
makeServicePortName("ns1", "ep1", "p13"): { makeServicePortName("ns1", "ep1", "p13"): {
{"1.1.1.3:13", false}, {endpoint: "1.1.1.3:13", isLocal: false},
{"1.1.1.4:13", true}, {endpoint: "1.1.1.4:13", isLocal: true},
}, },
makeServicePortName("ns1", "ep1", "p14"): { makeServicePortName("ns1", "ep1", "p14"): {
{"1.1.1.3:14", false}, {endpoint: "1.1.1.3:14", isLocal: false},
{"1.1.1.4:14", true}, {endpoint: "1.1.1.4:14", isLocal: true},
}, },
makeServicePortName("ns2", "ep2", "p21"): { makeServicePortName("ns2", "ep2", "p21"): {
{"2.2.2.1:21", false}, {endpoint: "2.2.2.1:21", isLocal: false},
{"2.2.2.2:21", true}, {endpoint: "2.2.2.2:21", isLocal: true},
}, },
makeServicePortName("ns2", "ep2", "p22"): { makeServicePortName("ns2", "ep2", "p22"): {
{"2.2.2.1:22", false}, {endpoint: "2.2.2.1:22", isLocal: false},
{"2.2.2.2:22", true}, {endpoint: "2.2.2.2:22", isLocal: true},
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
{"1.1.1.2:11", true}, {endpoint: "1.1.1.2:11", isLocal: true},
}, },
makeServicePortName("ns1", "ep1", "p12"): { makeServicePortName("ns1", "ep1", "p12"): {
{"1.1.1.1:12", false}, {endpoint: "1.1.1.1:12", isLocal: false},
{"1.1.1.2:12", true}, {endpoint: "1.1.1.2:12", isLocal: true},
}, },
makeServicePortName("ns1", "ep1", "p13"): { makeServicePortName("ns1", "ep1", "p13"): {
{"1.1.1.3:13", false}, {endpoint: "1.1.1.3:13", isLocal: false},
{"1.1.1.4:13", true}, {endpoint: "1.1.1.4:13", isLocal: true},
}, },
makeServicePortName("ns1", "ep1", "p14"): { makeServicePortName("ns1", "ep1", "p14"): {
{"1.1.1.3:14", false}, {endpoint: "1.1.1.3:14", isLocal: false},
{"1.1.1.4:14", true}, {endpoint: "1.1.1.4:14", isLocal: true},
}, },
makeServicePortName("ns2", "ep2", "p21"): { makeServicePortName("ns2", "ep2", "p21"): {
{"2.2.2.1:21", false}, {endpoint: "2.2.2.1:21", isLocal: false},
{"2.2.2.2:21", true}, {endpoint: "2.2.2.2:21", isLocal: true},
}, },
makeServicePortName("ns2", "ep2", "p22"): { makeServicePortName("ns2", "ep2", "p22"): {
{"2.2.2.1:22", false}, {endpoint: "2.2.2.1:22", isLocal: false},
{"2.2.2.2:22", true}, {endpoint: "2.2.2.2:22", isLocal: true},
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
@ -2101,7 +2102,7 @@ func Test_updateEndpointsMap(t *testing.T) {
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{},
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): { makeServicePortName("ns1", "ep1", ""): {
{"1.1.1.1:11", true}, {endpoint: "1.1.1.1:11", isLocal: true},
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
@ -2118,7 +2119,7 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", ""): { makeServicePortName("ns1", "ep1", ""): {
{"1.1.1.1:11", true}, {endpoint: "1.1.1.1:11", isLocal: true},
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{},
@ -2137,17 +2138,17 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
{"1.1.1.2:11", true}, {endpoint: "1.1.1.2:11", isLocal: true},
}, },
makeServicePortName("ns1", "ep1", "p12"): { makeServicePortName("ns1", "ep1", "p12"): {
{"1.1.1.1:12", false}, {endpoint: "1.1.1.1:12", isLocal: false},
{"1.1.1.2:12", true}, {endpoint: "1.1.1.2:12", isLocal: true},
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
@ -2164,17 +2165,17 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
{"1.1.1.2:11", true}, {endpoint: "1.1.1.2:11", isLocal: true},
}, },
makeServicePortName("ns1", "ep1", "p12"): { makeServicePortName("ns1", "ep1", "p12"): {
{"1.1.1.1:12", false}, {endpoint: "1.1.1.1:12", isLocal: false},
{"1.1.1.2:12", true}, {endpoint: "1.1.1.2:12", isLocal: true},
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
expectedStale: []endpointServicePair{{ expectedStale: []endpointServicePair{{
@ -2198,15 +2199,15 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
makeServicePortName("ns1", "ep1", "p12"): { makeServicePortName("ns1", "ep1", "p12"): {
{"1.1.1.2:12", true}, {endpoint: "1.1.1.2:12", isLocal: true},
}, },
}, },
expectedStale: []endpointServicePair{}, expectedStale: []endpointServicePair{},
@ -2223,15 +2224,15 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
makeServicePortName("ns1", "ep1", "p12"): { makeServicePortName("ns1", "ep1", "p12"): {
{"1.1.1.2:12", false}, {endpoint: "1.1.1.2:12", isLocal: false},
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
expectedStale: []endpointServicePair{{ expectedStale: []endpointServicePair{{
@ -2249,12 +2250,12 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11-2"): { makeServicePortName("ns1", "ep1", "p11-2"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
expectedStale: []endpointServicePair{{ expectedStale: []endpointServicePair{{
@ -2272,12 +2273,12 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:22", false}, {endpoint: "1.1.1.1:22", isLocal: false},
}, },
}, },
expectedStale: []endpointServicePair{{ expectedStale: []endpointServicePair{{
@ -2301,39 +2302,39 @@ func Test_updateEndpointsMap(t *testing.T) {
}, },
oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
}, },
makeServicePortName("ns2", "ep2", "p22"): { makeServicePortName("ns2", "ep2", "p22"): {
{"2.2.2.2:22", true}, {endpoint: "2.2.2.2:22", isLocal: true},
{"2.2.2.22:22", true}, {endpoint: "2.2.2.22:22", isLocal: true},
}, },
makeServicePortName("ns2", "ep2", "p23"): { makeServicePortName("ns2", "ep2", "p23"): {
{"2.2.2.3:23", true}, {endpoint: "2.2.2.3:23", isLocal: true},
}, },
makeServicePortName("ns4", "ep4", "p44"): { makeServicePortName("ns4", "ep4", "p44"): {
{"4.4.4.4:44", true}, {endpoint: "4.4.4.4:44", isLocal: true},
{"4.4.4.5:44", true}, {endpoint: "4.4.4.5:44", isLocal: true},
}, },
makeServicePortName("ns4", "ep4", "p45"): { makeServicePortName("ns4", "ep4", "p45"): {
{"4.4.4.6:45", true}, {endpoint: "4.4.4.6:45", isLocal: true},
}, },
}, },
expectedResult: map[proxy.ServicePortName][]*endpointsInfo{ expectedResult: map[proxy.ServicePortName][]*endpointsInfo{
makeServicePortName("ns1", "ep1", "p11"): { makeServicePortName("ns1", "ep1", "p11"): {
{"1.1.1.1:11", false}, {endpoint: "1.1.1.1:11", isLocal: false},
{"1.1.1.11:11", false}, {endpoint: "1.1.1.11:11", isLocal: false},
}, },
makeServicePortName("ns1", "ep1", "p12"): { makeServicePortName("ns1", "ep1", "p12"): {
{"1.1.1.2:12", false}, {endpoint: "1.1.1.2:12", isLocal: false},
}, },
makeServicePortName("ns1", "ep1", "p122"): { makeServicePortName("ns1", "ep1", "p122"): {
{"1.1.1.2:122", false}, {endpoint: "1.1.1.2:122", isLocal: false},
}, },
makeServicePortName("ns3", "ep3", "p33"): { makeServicePortName("ns3", "ep3", "p33"): {
{"3.3.3.3:33", false}, {endpoint: "3.3.3.3:33", isLocal: false},
}, },
makeServicePortName("ns4", "ep4", "p44"): { makeServicePortName("ns4", "ep4", "p44"): {
{"4.4.4.4:44", true}, {endpoint: "4.4.4.4:44", isLocal: true},
}, },
}, },
expectedStale: []endpointServicePair{{ expectedStale: []endpointServicePair{{

View File

@ -52,7 +52,9 @@ func GetChainLines(table Table, save []byte) map[Chain]string {
} else if strings.HasPrefix(line, "#") { } else if strings.HasPrefix(line, "#") {
continue continue
} else if strings.HasPrefix(line, ":") && len(line) > 1 { } else if strings.HasPrefix(line, ":") && len(line) > 1 {
chain := Chain(strings.SplitN(line[1:], " ", 2)[0]) // We assume that the <line> contains space - chain lines have 3 fields,
// space delimited. If there is no space, this line will panic.
chain := Chain(line[1:strings.Index(line, " ")])
chainsMap[chain] = line chainsMap[chain] = line
} }
} }