Merge pull request #106497 from danwinship/traffic-policy-fixes

fix internalTrafficPolicy
This commit is contained in:
Kubernetes Prow Robot 2022-03-28 14:19:54 -07:00 committed by GitHub
commit 9f213370cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 658 additions and 312 deletions

View File

@ -38,11 +38,9 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/events"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metaproxier"
@ -119,6 +117,7 @@ type serviceInfo struct {
// The following fields are computed and stored for performance reasons.
serviceNameString string
servicePortChainName utiliptables.Chain
serviceLocalChainName utiliptables.Chain
serviceFirewallChainName utiliptables.Chain
serviceLBChainName utiliptables.Chain
}
@ -133,6 +132,7 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.B
protocol := strings.ToLower(string(info.Protocol()))
info.serviceNameString = svcPortName.String()
info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol)
info.serviceLocalChainName = serviceLocalChainName(info.serviceNameString, protocol)
info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol)
info.serviceLBChainName = serviceLBChainName(info.serviceNameString, protocol)
@ -436,7 +436,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) {
// Hunt for service and endpoint chains.
for chain := range existingNATChains {
chainString := string(chain)
if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") {
if isServiceChainName(chainString) {
natChains.WriteBytes(existingNATChains[chain]) // flush
natRules.Write("-X", chainString) // delete
}
@ -683,34 +683,65 @@ func portProtoHash(servicePortName string, protocol string) string {
return encoded[:16]
}
// servicePortChainName takes the ServicePortName for a service and
// returns the associated iptables chain. This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-SVC-".
const (
servicePortChainNamePrefix = "KUBE-SVC-"
serviceLocalChainNamePrefix = "KUBE-SVL-"
serviceFirewallChainNamePrefix = "KUBE-FW-"
serviceLBChainNamePrefix = "KUBE-XLB-"
servicePortEndpointChainNamePrefix = "KUBE-SEP-"
)
// servicePortChainName returns the name of the KUBE-SVC-XXXX chain for a service, which is the
// main iptables chain for that service, used for dispatching to endpoints when using `Cluster`
// traffic policy.
func servicePortChainName(servicePortName string, protocol string) utiliptables.Chain {
return utiliptables.Chain("KUBE-SVC-" + portProtoHash(servicePortName, protocol))
return utiliptables.Chain(servicePortChainNamePrefix + portProtoHash(servicePortName, protocol))
}
// serviceFirewallChainName takes the ServicePortName for a service and
// returns the associated iptables chain. This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-FW-".
// serviceLocalChainName returns the name of the KUBE-SVL-XXXX chain for a service, which
// handles dispatching to local endpoints when using `Local` traffic policy. This chain only
// exists if the service has `Local` internal or external traffic policy.
func serviceLocalChainName(servicePortName string, protocol string) utiliptables.Chain {
return utiliptables.Chain(serviceLocalChainNamePrefix + portProtoHash(servicePortName, protocol))
}
// serviceFirewallChainName returns the name of the KUBE-FW-XXXX chain for a service, which
// is used to implement the filtering for the LoadBalancerSourceRanges feature.
func serviceFirewallChainName(servicePortName string, protocol string) utiliptables.Chain {
return utiliptables.Chain("KUBE-FW-" + portProtoHash(servicePortName, protocol))
return utiliptables.Chain(serviceFirewallChainNamePrefix + portProtoHash(servicePortName, protocol))
}
// serviceLBPortChainName takes the ServicePortName for a service and
// returns the associated iptables chain. This is computed by hashing (sha256)
// then encoding to base32 and truncating with the prefix "KUBE-XLB-". We do
// this because IPTables Chain Names must be <= 28 chars long, and the longer
// they are the harder they are to read.
// serviceLBChainName returns the name of the KUBE-XLB-XXXX chain for a service, which
// implements "short-circuiting" for internally-originated load balancer traffic when using
// `Local` external traffic policy. It forwards traffic from local sources to the KUBE-SVC-XXXX
// chain and traffic from external sources to the KUBE-SVL-XXXX chain.
func serviceLBChainName(servicePortName string, protocol string) utiliptables.Chain {
return utiliptables.Chain("KUBE-XLB-" + portProtoHash(servicePortName, protocol))
return utiliptables.Chain(serviceLBChainNamePrefix + portProtoHash(servicePortName, protocol))
}
// This is the same as servicePortChainName but with the endpoint included.
// servicePortEndpointChainName returns the name of the KUBE-SEP-XXXX chain for a particular
// service endpoint.
func servicePortEndpointChainName(servicePortName string, protocol string, endpoint string) utiliptables.Chain {
hash := sha256.Sum256([]byte(servicePortName + protocol + endpoint))
encoded := base32.StdEncoding.EncodeToString(hash[:])
return utiliptables.Chain("KUBE-SEP-" + encoded[:16])
return utiliptables.Chain(servicePortEndpointChainNamePrefix + encoded[:16])
}
func isServiceChainName(chainString string) bool {
prefixes := []string{
servicePortChainNamePrefix,
serviceLocalChainNamePrefix,
servicePortEndpointChainNamePrefix,
serviceFirewallChainNamePrefix,
serviceLBChainNamePrefix,
}
for _, p := range prefixes {
if strings.HasPrefix(chainString, p) {
return true
}
}
return false
}
// After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we
@ -931,15 +962,6 @@ func (proxier *Proxier) syncProxyRules() {
// Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
// We are creating those slices ones here to avoid memory reallocations
// in every loop. Note that reuse the memory, instead of doing:
// slice = <some new slice>
// you should always do one of the below:
// slice = slice[:0] // and then append to it
// slice = append(slice[:0], ...)
readyEndpointChains := make([]utiliptables.Chain, 0)
localEndpointChains := make([]utiliptables.Chain, 0)
// To avoid growing this slice, we arbitrarily set its size to 64,
// there is never more than that many arguments for a single line.
// Note that even if we go over 64, it will still be correct - it
@ -980,36 +1002,14 @@ func (proxier *Proxier) syncProxyRules() {
allEndpoints := proxier.endpointsMap[svcName]
// Filtering for topology aware endpoints. This function will only
// filter endpoints if appropriate feature gates are enabled and the
// Service does not have conflicting configuration such as
// externalTrafficPolicy=Local.
allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
// Scan the endpoints list to see what we have. "hasEndpoints" will be true
// if there are any usable endpoints for this service anywhere in the cluster.
var hasEndpoints, hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
for _, ep := range allEndpoints {
if ep.IsReady() {
hasEndpoints = true
if ep.GetIsLocal() {
hasLocalReadyEndpoints = true
}
} else if svc.NodeLocalExternal() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
if ep.IsServing() && ep.IsTerminating() {
hasEndpoints = true
if ep.GetIsLocal() {
hasLocalServingTerminatingEndpoints = true
}
}
}
}
useTerminatingEndpoints := !hasLocalReadyEndpoints && hasLocalServingTerminatingEndpoints
// Figure out the endpoints for Cluster and Local traffic policy.
// allLocallyReachableEndpoints is the set of all endpoints that can be routed to
// from this node, given the service's traffic policies. hasEndpoints is true
// if the service has any usable endpoints on any node, not just this one.
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
// Generate the per-endpoint chains.
readyEndpointChains = readyEndpointChains[:0]
localEndpointChains = localEndpointChains[:0]
for _, ep := range allEndpoints {
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep)
@ -1017,27 +1017,6 @@ func (proxier *Proxier) syncProxyRules() {
}
endpointChain := epInfo.ChainName
endpointInUse := false
if epInfo.Ready {
readyEndpointChains = append(readyEndpointChains, endpointChain)
endpointInUse = true
}
if svc.NodeLocalExternal() && epInfo.IsLocal {
if useTerminatingEndpoints {
if epInfo.Serving && epInfo.Terminating {
localEndpointChains = append(localEndpointChains, endpointChain)
endpointInUse = true
}
} else if epInfo.Ready {
localEndpointChains = append(localEndpointChains, endpointChain)
endpointInUse = true
}
}
if !endpointInUse {
continue
}
// Create the endpoint chain, retaining counters if possible.
if chain, ok := existingNATChains[endpointChain]; ok {
@ -1063,27 +1042,80 @@ func (proxier *Proxier) syncProxyRules() {
proxier.natRules.Write(args)
}
svcChain := svcInfo.servicePortChainName
if hasEndpoints {
// Create the per-service chain, retaining counters if possible.
if chain, ok := existingNATChains[svcChain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
proxier.natChains.Write(utiliptables.MakeChainLine(svcChain))
}
activeNATChains[svcChain] = true
policyClusterChain := svcInfo.servicePortChainName
policyLocalChain := svcInfo.serviceLocalChainName
svcXlbChain := svcInfo.serviceLBChainName
internalTrafficChain := policyClusterChain
externalTrafficChain := policyClusterChain
if svcInfo.NodeLocalInternal() {
internalTrafficChain = policyLocalChain
}
if svcInfo.NodeLocalExternal() {
externalTrafficChain = svcXlbChain
}
svcXlbChain := svcInfo.serviceLBChainName
if hasEndpoints && svcInfo.NodeLocalExternal() {
// Only for services request OnlyLocal traffic
// create the per-service LB chain, retaining counters if possible.
if lbChain, ok := existingNATChains[svcXlbChain]; ok {
proxier.natChains.WriteBytes(lbChain)
if hasEndpoints && svcInfo.UsesClusterEndpoints() {
// Create the Cluster traffic policy chain, retaining counters if possible.
if chain, ok := existingNATChains[policyClusterChain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
proxier.natChains.Write(utiliptables.MakeChainLine(policyClusterChain))
}
activeNATChains[policyClusterChain] = true
}
if hasEndpoints && svcInfo.ExternallyAccessible() && svcInfo.NodeLocalExternal() {
if chain, ok := existingNATChains[svcXlbChain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
proxier.natChains.Write(utiliptables.MakeChainLine(svcXlbChain))
}
activeNATChains[svcXlbChain] = true
// The XLB chain redirects all pod -> external VIP
// traffic to the Service's ClusterIP instead. This happens
// whether or not we have local endpoints; only if localDetector
// is implemented
if proxier.localDetector.IsImplemented() {
proxier.natRules.Write(
"-A", string(svcXlbChain),
"-m", "comment", "--comment",
`"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`,
proxier.localDetector.IfLocal(),
"-j", string(policyClusterChain))
}
// Next, redirect all src-type=LOCAL -> LB IP to the service chain
// for externalTrafficPolicy=Local This allows traffic originating
// from the host to be redirected to the service correctly,
// otherwise traffic to LB IPs are dropped if there are no local
// endpoints.
proxier.natRules.Write(
"-A", string(svcXlbChain),
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s LB IP"`, svcNameString),
"-m", "addrtype", "--src-type", "LOCAL",
"-j", string(KubeMarkMasqChain))
proxier.natRules.Write(
"-A", string(svcXlbChain),
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString),
"-m", "addrtype", "--src-type", "LOCAL",
"-j", string(policyClusterChain))
// Everything else goes to the SVL chain
proxier.natRules.Write(
"-A", string(svcXlbChain),
"-j", string(policyLocalChain))
}
if hasEndpoints && svcInfo.UsesLocalEndpoints() {
if chain, ok := existingNATChains[policyLocalChain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
proxier.natChains.Write(utiliptables.MakeChainLine(policyLocalChain))
}
activeNATChains[policyLocalChain] = true
}
// Capture the clusterIP.
@ -1096,7 +1128,7 @@ func (proxier *Proxier) syncProxyRules() {
)
if proxier.masqueradeAll {
proxier.natRules.Write(
"-A", string(svcChain),
"-A", string(internalTrafficChain),
args,
"-j", string(KubeMarkMasqChain))
} else if proxier.localDetector.IsImplemented() {
@ -1104,9 +1136,8 @@ func (proxier *Proxier) syncProxyRules() {
// is that you can establish a static route for your Service range,
// routing to any node, and that node will bridge into the Service
// for you. Since that might bounce off-node, we masquerade here.
// If/when we support "Local" policy for VIPs, we should update this.
proxier.natRules.Write(
"-A", string(svcChain),
"-A", string(internalTrafficChain),
args,
proxier.localDetector.IfNotLocal(),
"-j", string(KubeMarkMasqChain))
@ -1114,7 +1145,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.natRules.Write(
"-A", string(kubeServicesChain),
args,
"-j", string(svcChain))
"-j", string(internalTrafficChain))
} else {
// No endpoints.
proxier.filterRules.Write(
@ -1137,14 +1168,12 @@ func (proxier *Proxier) syncProxyRules() {
"--dport", strconv.Itoa(svcInfo.Port()),
)
destChain := svcXlbChain
// We have to SNAT packets to external IPs if externalTrafficPolicy is cluster
// and the traffic is NOT Local. Local traffic coming from Pods and Nodes will
// be always forwarded to the corresponding Service, so no need to SNAT
// If we can't differentiate the local traffic we always SNAT.
if !svcInfo.NodeLocalExternal() {
appendTo := []string{"-A", string(svcChain)}
destChain = svcChain
appendTo := []string{"-A", string(policyClusterChain)}
// This masquerades off-cluster traffic to a External IP.
if proxier.localDetector.IsImplemented() {
proxier.natRules.Write(
@ -1163,7 +1192,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.natRules.Write(
"-A", string(kubeServicesChain),
args,
"-j", string(destChain))
"-j", string(externalTrafficChain))
} else {
// No endpoints.
@ -1208,23 +1237,20 @@ func (proxier *Proxier) syncProxyRules() {
"-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString),
)
// Each source match rule in the FW chain may jump to either the SVC or the XLB chain
chosenChain := svcXlbChain
// If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP.
if !svcInfo.NodeLocalExternal() {
proxier.natRules.Write(args, "-j", string(KubeMarkMasqChain))
chosenChain = svcChain
}
if len(svcInfo.LoadBalancerSourceRanges()) == 0 {
// allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain
proxier.natRules.Write(args, "-j", string(chosenChain))
proxier.natRules.Write(args, "-j", string(externalTrafficChain))
} else {
// firewall filter based on each source range
allowFromNode := false
for _, src := range svcInfo.LoadBalancerSourceRanges() {
proxier.natRules.Write(args, "-s", src, "-j", string(chosenChain))
proxier.natRules.Write(args, "-s", src, "-j", string(externalTrafficChain))
_, cidr, err := netutils.ParseCIDRSloppy(src)
if err != nil {
klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr)
@ -1239,7 +1265,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.natRules.Write(
args,
"-s", ingress,
"-j", string(chosenChain))
"-j", string(externalTrafficChain))
}
}
@ -1263,7 +1289,6 @@ func (proxier *Proxier) syncProxyRules() {
// worthwhile to make a new per-service chain for nodeport rules, but
// with just 2 rules it ends up being a waste and a cognitive burden.
if svcInfo.NodePort() != 0 && len(nodeAddresses) != 0 {
if hasEndpoints {
args = append(args[:0],
"-m", "comment", "--comment", svcNameString,
@ -1273,14 +1298,9 @@ func (proxier *Proxier) syncProxyRules() {
if !svcInfo.NodeLocalExternal() {
// Nodeports need SNAT, unless they're local.
proxier.natRules.Write(
"-A", string(svcChain),
"-A", string(policyClusterChain),
args,
"-j", string(KubeMarkMasqChain))
// Jump to the service chain.
proxier.natRules.Write(
"-A", string(kubeNodePortsChain),
args,
"-j", string(svcChain))
} else {
// TODO: Make all nodePorts jump to the firewall chain.
// Currently we only create it for loadbalancers (#33586).
@ -1290,16 +1310,16 @@ func (proxier *Proxier) syncProxyRules() {
if isIPv6 {
loopback = "::1/128"
}
appendTo := []string{"-A", string(kubeNodePortsChain)}
proxier.natRules.Write(
appendTo,
"-A", string(kubeNodePortsChain),
args,
"-s", loopback, "-j", string(KubeMarkMasqChain))
proxier.natRules.Write(
appendTo,
args,
"-j", string(svcXlbChain))
}
// Jump to the service chain.
proxier.natRules.Write(
"-A", string(kubeNodePortsChain),
args,
"-j", string(externalTrafficChain))
} else {
// No endpoints.
proxier.filterRules.Write(
@ -1326,57 +1346,26 @@ func (proxier *Proxier) syncProxyRules() {
)
}
if !hasEndpoints {
continue
if svcInfo.UsesClusterEndpoints() {
// Write rules jumping from policyClusterChain to clusterEndpoints
proxier.writeServiceToEndpointRules(svcNameString, svcInfo, policyClusterChain, clusterEndpoints, args)
}
// Write rules jumping from svcChain to readyEndpointChains
proxier.writeServiceToEndpointRules(svcNameString, svcInfo, svcChain, readyEndpointChains, args)
// The logic below this applies only if this service is marked as OnlyLocal
if !svcInfo.NodeLocalExternal() {
continue
}
// First rule in the chain redirects all pod -> external VIP traffic to the
// Service's ClusterIP instead. This happens whether or not we have local
// endpoints; only if localDetector is implemented
if proxier.localDetector.IsImplemented() {
proxier.natRules.Write(
"-A", string(svcXlbChain),
"-m", "comment", "--comment",
`"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`,
proxier.localDetector.IfLocal(),
"-j", string(svcChain))
}
// Next, redirect all src-type=LOCAL -> LB IP to the service chain for externalTrafficPolicy=Local
// This allows traffic originating from the host to be redirected to the service correctly,
// otherwise traffic to LB IPs are dropped if there are no local endpoints.
args = append(args[:0], "-A", string(svcXlbChain))
proxier.natRules.Write(
args,
"-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s LB IP"`, svcNameString),
"-m", "addrtype", "--src-type", "LOCAL", "-j", string(KubeMarkMasqChain))
proxier.natRules.Write(
args,
"-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString),
"-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain))
numLocalEndpoints := len(localEndpointChains)
if numLocalEndpoints == 0 {
// Blackhole all traffic since there are no local endpoints
args = append(args[:0],
"-A", string(svcXlbChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
"-j",
string(KubeMarkDropChain),
)
proxier.natRules.Write(args)
} else {
// Write rules jumping from svcXlbChain to localEndpointChains
proxier.writeServiceToEndpointRules(svcNameString, svcInfo, svcXlbChain, localEndpointChains, args)
if svcInfo.UsesLocalEndpoints() {
if len(localEndpoints) != 0 {
// Write rules jumping from policyLocalChain to localEndpointChains
proxier.writeServiceToEndpointRules(svcNameString, svcInfo, policyLocalChain, localEndpoints, args)
} else if hasEndpoints {
// Blackhole all traffic since there are no local endpoints
args = append(args[:0],
"-A", string(policyLocalChain),
"-m", "comment", "--comment",
fmt.Sprintf(`"%s has no local endpoints"`, svcNameString),
"-j",
string(KubeMarkDropChain),
)
proxier.natRules.Write(args)
}
}
}
@ -1384,7 +1373,7 @@ func (proxier *Proxier) syncProxyRules() {
for chain := range existingNATChains {
if !activeNATChains[chain] {
chainString := string(chain)
if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") {
if !isServiceChainName(chainString) {
// Ignore chains that aren't ours.
continue
}
@ -1535,27 +1524,35 @@ func (proxier *Proxier) syncProxyRules() {
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}
func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpointChains []utiliptables.Chain, args []string) {
func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
// First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, endpointChain := range endpointChains {
for _, ep := range endpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
continue
}
args = append(args[:0],
"-A", string(svcChain),
)
args = proxier.appendServiceCommentLocked(args, svcNameString)
args = append(args,
"-m", "recent", "--name", string(endpointChain),
"-m", "recent", "--name", string(epInfo.ChainName),
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
"-j", string(endpointChain),
"-j", string(epInfo.ChainName),
)
proxier.natRules.Write(args)
}
}
// Now write loadbalancing rules.
numEndpoints := len(endpointChains)
for i, endpointChain := range endpointChains {
// Balancing rules in the per-service chain.
numEndpoints := len(endpoints)
for i, ep := range endpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
continue
}
args = append(args[:0], "-A", string(svcChain))
args = proxier.appendServiceCommentLocked(args, svcNameString)
if i < (numEndpoints - 1) {
@ -1566,6 +1563,6 @@ func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInf
"--probability", proxier.probability(numEndpoints-i))
}
// The final (or only if n == 1) rule is a guaranteed match.
proxier.natRules.Write(args, "-j", string(endpointChain))
proxier.natRules.Write(args, "-j", string(epInfo.ChainName))
}
}

View File

@ -995,6 +995,7 @@ COMMIT
:KUBE-SEP-SXIVWICOYRO3J4NJ - [0:0]
:KUBE-SVC-GNZBNJ2PO5MGZ6GT - [0:0]
:KUBE-XLB-GNZBNJ2PO5MGZ6GT - [0:0]
:KUBE-SVL-GNZBNJ2PO5MGZ6GT - [0:0]
:KUBE-FW-GNZBNJ2PO5MGZ6GT - [0:0]
:KUBE-SEP-RS4RBKLTHTF2IUXJ - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
@ -1025,7 +1026,8 @@ COMMIT
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-GNZBNJ2PO5MGZ6GT
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "masquerade LOCAL traffic for ns2/svc2:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "route LOCAL traffic for ns2/svc2:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-GNZBNJ2PO5MGZ6GT
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "ns2/svc2:p80 has no local endpoints" -j KUBE-MARK-DROP
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -j KUBE-SVL-GNZBNJ2PO5MGZ6GT
-A KUBE-SVL-GNZBNJ2PO5MGZ6GT -m comment --comment "ns2/svc2:p80 has no local endpoints" -j KUBE-MARK-DROP
-A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-NODEPORTS -m comment --comment ns3/svc3:p80 -m tcp -p tcp --dport 3002 -j KUBE-SVC-X27LE4BHSL4DOUIK
@ -1072,6 +1074,7 @@ COMMIT
:KUBE-SVC-GNZBNJ2PO5MGZ6GT - [0:0]
:KUBE-SVC-X27LE4BHSL4DOUIK - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
:KUBE-SVL-GNZBNJ2PO5MGZ6GT - [0:0]
:KUBE-XLB-GNZBNJ2PO5MGZ6GT - [0:0]
-A KUBE-NODEPORTS -m comment --comment ns2/svc2:p80 -m tcp -p tcp --dport 3001 -s 127.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-NODEPORTS -m comment --comment ns2/svc2:p80 -m tcp -p tcp --dport 3001 -j KUBE-XLB-GNZBNJ2PO5MGZ6GT
@ -1111,10 +1114,11 @@ COMMIT
-A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment ns3/svc3:p80 -j KUBE-SEP-OYPFS5VJICHGATKP
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-SXIVWICOYRO3J4NJ
-A KUBE-SVL-GNZBNJ2PO5MGZ6GT -m comment --comment "ns2/svc2:p80 has no local endpoints" -j KUBE-MARK-DROP
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-GNZBNJ2PO5MGZ6GT
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "masquerade LOCAL traffic for ns2/svc2:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "route LOCAL traffic for ns2/svc2:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-GNZBNJ2PO5MGZ6GT
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "ns2/svc2:p80 has no local endpoints" -j KUBE-MARK-DROP
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -j KUBE-SVL-GNZBNJ2PO5MGZ6GT
COMMIT
`,
},
@ -1514,6 +1518,7 @@ COMMIT
:KUBE-SEP-SXIVWICOYRO3J4NJ - [0:0]
:KUBE-SVC-GNZBNJ2PO5MGZ6GT - [0:0]
:KUBE-XLB-GNZBNJ2PO5MGZ6GT - [0:0]
:KUBE-SVL-GNZBNJ2PO5MGZ6GT - [0:0]
:KUBE-FW-GNZBNJ2PO5MGZ6GT - [0:0]
:KUBE-SEP-RS4RBKLTHTF2IUXJ - [0:0]
:KUBE-SVC-PAZTZYUUMV5KCDZL - [0:0]
@ -1547,7 +1552,8 @@ COMMIT
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-GNZBNJ2PO5MGZ6GT
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "masquerade LOCAL traffic for ns2/svc2:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "route LOCAL traffic for ns2/svc2:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-GNZBNJ2PO5MGZ6GT
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "ns2/svc2:p80 has no local endpoints" -j KUBE-MARK-DROP
-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -j KUBE-SVL-GNZBNJ2PO5MGZ6GT
-A KUBE-SVL-GNZBNJ2PO5MGZ6GT -m comment --comment "ns2/svc2:p80 has no local endpoints" -j KUBE-MARK-DROP
-A KUBE-SVC-PAZTZYUUMV5KCDZL -m comment --comment "ns2b/svc2b:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -m comment --comment "ns2b/svc2b:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-PAZTZYUUMV5KCDZL
-A KUBE-SERVICES -m comment --comment "ns2b/svc2b:p80 loadbalancer IP" -m tcp -p tcp -d 5.6.7.8 --dport 80 -j KUBE-FW-PAZTZYUUMV5KCDZL
@ -2100,6 +2106,7 @@ COMMIT
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
:KUBE-SVL-XPGD46QRK7WJZT7O - [0:0]
:KUBE-XLB-XPGD46QRK7WJZT7O - [0:0]
:KUBE-SEP-SXIVWICOYRO3J4NJ - [0:0]
:KUBE-SEP-ZX7GRIZKSNUQ3LAJ - [0:0]
@ -2119,7 +2126,8 @@ COMMIT
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "masquerade LOCAL traffic for ns1/svc1:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "route LOCAL traffic for ns1/svc1:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
-A KUBE-XLB-XPGD46QRK7WJZT7O -j KUBE-SVL-XPGD46QRK7WJZT7O
-A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`
@ -2412,6 +2420,7 @@ COMMIT
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
:KUBE-SVL-XPGD46QRK7WJZT7O - [0:0]
:KUBE-XLB-XPGD46QRK7WJZT7O - [0:0]
:KUBE-FW-XPGD46QRK7WJZT7O - [0:0]
:KUBE-SEP-SXIVWICOYRO3J4NJ - [0:0]
@ -2438,8 +2447,9 @@ COMMIT
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "masquerade LOCAL traffic for ns1/svc1:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "route LOCAL traffic for ns1/svc1:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -m recent --name KUBE-SEP-ZX7GRIZKSNUQ3LAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
-A KUBE-XLB-XPGD46QRK7WJZT7O -j KUBE-SVL-XPGD46QRK7WJZT7O
-A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -m recent --name KUBE-SEP-ZX7GRIZKSNUQ3LAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
-A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`
@ -2469,6 +2479,7 @@ COMMIT
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
:KUBE-SVL-XPGD46QRK7WJZT7O - [0:0]
:KUBE-XLB-XPGD46QRK7WJZT7O - [0:0]
:KUBE-SEP-SXIVWICOYRO3J4NJ - [0:0]
:KUBE-SEP-ZX7GRIZKSNUQ3LAJ - [0:0]
@ -2487,7 +2498,8 @@ COMMIT
-A KUBE-SEP-ZX7GRIZKSNUQ3LAJ -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.180.2.1:80
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "masquerade LOCAL traffic for ns1/svc1:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "route LOCAL traffic for ns1/svc1:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
-A KUBE-XLB-XPGD46QRK7WJZT7O -j KUBE-SVL-XPGD46QRK7WJZT7O
-A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -d 192.168.0.2 -j KUBE-NODEPORTS
COMMIT
`
@ -2515,6 +2527,7 @@ COMMIT
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-XPGD46QRK7WJZT7O - [0:0]
:KUBE-SVL-XPGD46QRK7WJZT7O - [0:0]
:KUBE-XLB-XPGD46QRK7WJZT7O - [0:0]
:KUBE-SEP-SXIVWICOYRO3J4NJ - [0:0]
:KUBE-SEP-ZX7GRIZKSNUQ3LAJ - [0:0]
@ -2535,7 +2548,8 @@ COMMIT
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "masquerade LOCAL traffic for ns1/svc1:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "route LOCAL traffic for ns1/svc1:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-XPGD46QRK7WJZT7O
-A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
-A KUBE-XLB-XPGD46QRK7WJZT7O -j KUBE-SVL-XPGD46QRK7WJZT7O
-A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -d 192.168.0.2 -j KUBE-NODEPORTS
COMMIT
`
@ -3890,6 +3904,7 @@ COMMIT
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
@ -3914,7 +3929,8 @@ COMMIT
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-XLB-AQI2S6QIMU7PVVRP -j KUBE-SVL-AQI2S6QIMU7PVVRP
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`
@ -4416,15 +4432,15 @@ COMMIT
:KUBE-NODEPORTS - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVL-AQI2S6QIMU7PVVRP
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
@ -4447,7 +4463,6 @@ COMMIT
:KUBE-EXTERNAL-SERVICES - [0:0]
:KUBE-FORWARD - [0:0]
:KUBE-NODEPORTS - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1 has no endpoints" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j REJECT
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
@ -4457,10 +4472,14 @@ COMMIT
:KUBE-NODEPORTS - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0]
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVL-AQI2S6QIMU7PVVRP
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`,
@ -4666,6 +4685,7 @@ COMMIT
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-FW-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
@ -4695,10 +4715,11 @@ COMMIT
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --rcheck --seconds 10800 --reap -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -j KUBE-SVL-AQI2S6QIMU7PVVRP
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --rcheck --seconds 10800 --reap -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`,
@ -4786,6 +4807,7 @@ COMMIT
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-FW-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
@ -4815,10 +4837,11 @@ COMMIT
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --rcheck --seconds 10800 --reap -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -j KUBE-SVL-AQI2S6QIMU7PVVRP
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --rcheck --seconds 10800 --reap -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`,
@ -4898,6 +4921,7 @@ COMMIT
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-FW-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0]
@ -4923,10 +4947,11 @@ COMMIT
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-XGJFVO3L2O5SRFNT --rcheck --seconds 10800 --reap -j KUBE-SEP-XGJFVO3L2O5SRFNT
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT
-A KUBE-XLB-AQI2S6QIMU7PVVRP -j KUBE-SVL-AQI2S6QIMU7PVVRP
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-XGJFVO3L2O5SRFNT --rcheck --seconds 10800 --reap -j KUBE-SEP-XGJFVO3L2O5SRFNT
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`,
@ -5011,6 +5036,7 @@ COMMIT
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-FW-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0]
@ -5030,7 +5056,8 @@ COMMIT
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -j KUBE-SVL-AQI2S6QIMU7PVVRP
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`,
@ -5081,6 +5108,7 @@ COMMIT
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-FW-AQI2S6QIMU7PVVRP - [0:0]
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
@ -5095,7 +5123,8 @@ COMMIT
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP
-A KUBE-XLB-AQI2S6QIMU7PVVRP -j KUBE-SVL-AQI2S6QIMU7PVVRP
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`,

View File

@ -1951,14 +1951,6 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
// curEndpoints represents IPVS destinations listed from current system.
curEndpoints := sets.NewString()
// readyEndpoints represents Endpoints watched from API Server.
readyEndpoints := sets.NewString()
// localReadyEndpoints represents local endpoints that are ready and NOT terminating.
localReadyEndpoints := sets.NewString()
// localReadyTerminatingEndpoints represents local endpoints that are ready AND terminating.
// Fall back to these endpoints if no non-terminating ready endpoints exist for node-local traffic.
localReadyTerminatingEndpoints := sets.NewString()
curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
if err != nil {
klog.ErrorS(err, "Failed to list IPVS destinations")
@ -1978,32 +1970,17 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
if !ok {
klog.InfoS("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName)
} else {
endpoints = proxy.FilterEndpoints(endpoints, svcInfo, proxier.nodeLabels)
clusterEndpoints, localEndpoints, _, _ := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels)
if onlyNodeLocalEndpoints {
endpoints = localEndpoints
} else {
endpoints = clusterEndpoints
}
}
newEndpoints := sets.NewString()
for _, epInfo := range endpoints {
if epInfo.IsReady() {
readyEndpoints.Insert(epInfo.String())
}
if onlyNodeLocalEndpoints && epInfo.GetIsLocal() {
if epInfo.IsReady() {
localReadyEndpoints.Insert(epInfo.String())
} else if epInfo.IsServing() && epInfo.IsTerminating() {
localReadyTerminatingEndpoints.Insert(epInfo.String())
}
}
}
newEndpoints := readyEndpoints
if onlyNodeLocalEndpoints {
newEndpoints = localReadyEndpoints
if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
if len(newEndpoints) == 0 && localReadyTerminatingEndpoints.Len() > 0 {
newEndpoints = localReadyTerminatingEndpoints
}
}
newEndpoints.Insert(epInfo.String())
}
// Create new endpoints

View File

@ -139,6 +139,24 @@ func (info *BaseServiceInfo) HintsAnnotation() string {
return info.hintsAnnotation
}
// ExternallyAccessible is part of ServicePort interface.
func (info *BaseServiceInfo) ExternallyAccessible() bool {
return info.nodePort != 0 || len(info.loadBalancerStatus.Ingress) != 0 || len(info.externalIPs) != 0
}
// UsesClusterEndpoints is part of ServicePort interface.
func (info *BaseServiceInfo) UsesClusterEndpoints() bool {
// The service port uses Cluster endpoints if the internal traffic policy is "Cluster",
// or if it accepts external traffic at all. (Even if the external traffic policy is
// "Local", we need Cluster endpoints to implement short circuiting.)
return !info.nodeLocalInternal || info.ExternallyAccessible()
}
// UsesLocalEndpoints is part of ServicePort interface.
func (info *BaseServiceInfo) UsesLocalEndpoints() bool {
return info.nodeLocalInternal || (info.nodeLocalExternal && info.ExternallyAccessible())
}
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo {
nodeLocalExternal := false
if apiservice.RequestsOnlyLocalTraffic(service) {

View File

@ -23,78 +23,165 @@ import (
"k8s.io/kubernetes/pkg/features"
)
// FilterEndpoints filters endpoints based on Service configuration, node
// labels, and enabled feature gates. This is primarily used to enable topology
// aware routing.
func FilterEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) []Endpoint {
if svcInfo.NodeLocalExternal() {
return endpoints
// CategorizeEndpoints returns:
//
// - The service's usable Cluster-traffic-policy endpoints (taking topology into account, if
// relevant). This will be nil if the service does not ever use Cluster traffic policy.
//
// - The service's usable Local-traffic-policy endpoints (including terminating endpoints, if
// relevant). This will be nil if the service does not ever use Local traffic policy.
//
// - The combined list of all endpoints reachable from this node (which is the union of the
// previous two lists, but in the case where it is identical to one or the other, we avoid
// allocating a separate list).
//
// - An indication of whether the service has any endpoints reachable from anywhere in the
// cluster. (This may be true even if allReachableEndpoints is empty.)
func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) {
var useTopology, useServingTerminatingEndpoints bool
if svcInfo.UsesClusterEndpoints() {
useTopology = canUseTopology(endpoints, svcInfo, nodeLabels)
clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
if !ep.IsReady() {
return false
}
if useTopology && !availableForTopology(ep, nodeLabels) {
return false
}
return true
})
// If there are any Ready endpoints anywhere in the cluster, we are
// guaranteed to get one in clusterEndpoints.
if len(clusterEndpoints) > 0 {
hasAnyEndpoints = true
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() {
return FilterLocalEndpoint(endpoints)
if !svcInfo.UsesLocalEndpoints() {
allReachableEndpoints = clusterEndpoints
return
}
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
return filterEndpointsWithHints(endpoints, svcInfo.HintsAnnotation(), nodeLabels)
// Pre-scan the endpoints, to figure out which type of endpoint Local
// traffic policy will use, and also to see if there are any usable
// endpoints anywhere in the cluster.
var hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
for _, ep := range endpoints {
if ep.IsReady() {
hasAnyEndpoints = true
if ep.GetIsLocal() {
hasLocalReadyEndpoints = true
}
} else if ep.IsServing() && ep.IsTerminating() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
hasAnyEndpoints = true
if ep.GetIsLocal() {
hasLocalServingTerminatingEndpoints = true
}
}
}
return endpoints
if hasLocalReadyEndpoints {
localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
return ep.GetIsLocal() && ep.IsReady()
})
} else if hasLocalServingTerminatingEndpoints {
useServingTerminatingEndpoints = true
localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
return ep.GetIsLocal() && ep.IsServing() && ep.IsTerminating()
})
}
if !svcInfo.UsesClusterEndpoints() {
allReachableEndpoints = localEndpoints
return
}
if !useTopology && !useServingTerminatingEndpoints {
// !useServingTerminatingEndpoints means that localEndpoints contains only
// Ready endpoints. !useTopology means that clusterEndpoints contains *every*
// Ready endpoint. So clusterEndpoints must be a superset of localEndpoints.
allReachableEndpoints = clusterEndpoints
return
}
// clusterEndpoints may contain remote endpoints that aren't in localEndpoints, while
// localEndpoints may contain terminating or topologically-unavailable local endpoints
// that aren't in clusterEndpoints. So we have to merge the two lists.
endpointsMap := make(map[string]Endpoint, len(clusterEndpoints)+len(localEndpoints))
for _, ep := range clusterEndpoints {
endpointsMap[ep.String()] = ep
}
for _, ep := range localEndpoints {
endpointsMap[ep.String()] = ep
}
allReachableEndpoints = make([]Endpoint, 0, len(endpointsMap))
for _, ep := range endpointsMap {
allReachableEndpoints = append(allReachableEndpoints, ep)
}
return
}
// filterEndpointsWithHints provides filtering based on the hints included in
// EndpointSlices. If any of the following are true, the full list of endpoints
// will be returned without any filtering:
// * The AnnotationTopologyAwareHints annotation is not set to "Auto" for this
// Service.
// * No zone is specified in node labels.
// * No endpoints for this Service have a hint pointing to the zone this
// instance of kube-proxy is running in.
// * One or more endpoints for this Service do not have hints specified.
func filterEndpointsWithHints(endpoints []Endpoint, hintsAnnotation string, nodeLabels map[string]string) []Endpoint {
// canUseTopology returns true if topology aware routing is enabled and properly configured
// in this cluster. That is, it checks that:
// * The TopologyAwareHints feature is enabled
// * The "service.kubernetes.io/topology-aware-hints" annotation on this Service is set to "Auto"
// * The node's labels include "topology.kubernetes.io/zone"
// * All of the endpoints for this Service have a topology hint
// * At least one endpoint for this Service is hinted for this node's zone.
func canUseTopology(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) bool {
hintsAnnotation := svcInfo.HintsAnnotation()
if hintsAnnotation != "Auto" && hintsAnnotation != "auto" {
if hintsAnnotation != "" && hintsAnnotation != "Disabled" && hintsAnnotation != "disabled" {
klog.InfoS("Skipping topology aware endpoint filtering since Service has unexpected value", "annotationTopologyAwareHints", v1.AnnotationTopologyAwareHints, "hints", hintsAnnotation)
}
return endpoints
return false
}
zone, ok := nodeLabels[v1.LabelTopologyZone]
if !ok || zone == "" {
klog.InfoS("Skipping topology aware endpoint filtering since node is missing label", "label", v1.LabelTopologyZone)
return endpoints
return false
}
filteredEndpoints := []Endpoint{}
hasEndpointForZone := false
for _, endpoint := range endpoints {
if !endpoint.IsReady() {
continue
}
if endpoint.GetZoneHints().Len() == 0 {
klog.InfoS("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint")
return endpoints
return false
}
if endpoint.GetZoneHints().Has(zone) {
filteredEndpoints = append(filteredEndpoints, endpoint)
hasEndpointForZone = true
}
}
if len(filteredEndpoints) == 0 {
if !hasEndpointForZone {
klog.InfoS("Skipping topology aware endpoint filtering since no hints were provided for zone", "zone", zone)
return endpoints
return false
}
return filteredEndpoints
return true
}
// FilterLocalEndpoint returns the node local endpoints
func FilterLocalEndpoint(endpoints []Endpoint) []Endpoint {
var filteredEndpoints []Endpoint
// availableForTopology checks if this endpoint is available for use on this node, given
// topology constraints. (It assumes that canUseTopology() returned true.)
func availableForTopology(endpoint Endpoint, nodeLabels map[string]string) bool {
zone := nodeLabels[v1.LabelTopologyZone]
return endpoint.GetZoneHints().Has(zone)
}
// filterEndpoints filters endpoints according to predicate
func filterEndpoints(endpoints []Endpoint, predicate func(Endpoint) bool) []Endpoint {
filteredEndpoints := make([]Endpoint, 0, len(endpoints))
// Get all the local endpoints
for _, ep := range endpoints {
if ep.GetIsLocal() {
if predicate(ep) {
filteredEndpoints = append(filteredEndpoints, ep)
}
}

View File

@ -45,74 +45,105 @@ func checkExpectedEndpoints(expected sets.String, actual []Endpoint) error {
return kerrors.NewAggregate(errs)
}
func TestFilterEndpoints(t *testing.T) {
func TestCategorizeEndpoints(t *testing.T) {
testCases := []struct {
name string
hintsEnabled bool
nodeLabels map[string]string
serviceInfo ServicePort
endpoints []Endpoint
expectedEndpoints sets.String
name string
hintsEnabled bool
pteEnabled bool
nodeLabels map[string]string
serviceInfo ServicePort
endpoints []Endpoint
// We distinguish `nil` ("service doesn't use this kind of endpoints") from
// `sets.String()` ("service uses this kind of endpoints but has no endpoints").
// allEndpoints can be left unset if only one of clusterEndpoints and
// localEndpoints is set, and allEndpoints is identical to it.
// onlyRemoteEndpoints should be true if CategorizeEndpoints returns true for
// hasAnyEndpoints despite allEndpoints being empty.
clusterEndpoints sets.String
localEndpoints sets.String
allEndpoints sets.String
onlyRemoteEndpoints bool
}{{
name: "hints enabled, hints annotation == auto",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "auto"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "hints, hints annotation == disabled, hints ignored",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "disabled"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "disabled"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "hints, hints annotation == aUto (wrong capitalization), hints ignored",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "aUto"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "aUto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "hints, hints annotation empty, hints ignored",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: false},
serviceInfo: &BaseServiceInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "node local endpoints, hints are ignored",
name: "externalTrafficPolicy: Local, topology ignored for Local endpoints",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: true, hintsAnnotation: "auto"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: true, nodePort: 8080, hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80"),
allEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"),
}, {
name: "internalTrafficPolicy: Local, topology ignored for Local endpoints",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, hintsAnnotation: "auto", nodeLocalExternal: false, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80"),
allEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"),
}, {
name: "empty node labels",
hintsEnabled: true,
@ -121,7 +152,8 @@ func TestFilterEndpoints(t *testing.T) {
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80"),
localEndpoints: nil,
}, {
name: "empty zone label",
hintsEnabled: true,
@ -130,7 +162,8 @@ func TestFilterEndpoints(t *testing.T) {
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80"),
localEndpoints: nil,
}, {
name: "node in different zone, no endpoint filtering",
hintsEnabled: true,
@ -139,7 +172,8 @@ func TestFilterEndpoints(t *testing.T) {
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80"),
localEndpoints: nil,
}, {
name: "normal endpoint filtering, auto annotation",
hintsEnabled: true,
@ -151,7 +185,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "unready endpoint",
hintsEnabled: true,
@ -163,7 +198,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: false},
},
expectedEndpoints: sets.NewString("10.1.2.3:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80"),
localEndpoints: nil,
}, {
name: "only unready endpoints in same zone (should not filter)",
hintsEnabled: true,
@ -175,7 +211,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: false},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.4:80", "10.1.2.5:80"),
localEndpoints: nil,
}, {
name: "normal endpoint filtering, Auto annotation",
hintsEnabled: true,
@ -187,7 +224,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "hintsAnnotation empty, no filtering applied",
hintsEnabled: true,
@ -199,7 +237,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "hintsAnnotation disabled, no filtering applied",
hintsEnabled: true,
@ -211,7 +250,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "missing hints, no filtering applied",
hintsEnabled: true,
@ -223,7 +263,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: nil, Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "multiple hints per endpoint, filtering includes any endpoint with zone included",
hintsEnabled: true,
@ -235,12 +276,28 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-b", "zone-d"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "internalTrafficPolicy: Local, with empty endpoints",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true},
endpoints: []Endpoint{},
expectedEndpoints: nil,
name: "conflicting topology and localness require merging allEndpoints",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080, hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.2:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.3:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.2:80"),
localEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80", "10.0.0.2:80"),
}, {
name: "internalTrafficPolicy: Local, with empty endpoints",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true},
endpoints: []Endpoint{},
clusterEndpoints: nil,
localEndpoints: sets.NewString(),
}, {
name: "internalTrafficPolicy: Local, but all endpoints are remote",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true},
@ -248,7 +305,9 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
},
expectedEndpoints: nil,
clusterEndpoints: nil,
localEndpoints: sets.NewString(),
onlyRemoteEndpoints: true,
}, {
name: "internalTrafficPolicy: Local, all endpoints are local",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true},
@ -256,7 +315,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true},
},
expectedEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
clusterEndpoints: nil,
localEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "internalTrafficPolicy: Local, some endpoints are local",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true},
@ -264,17 +324,186 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
},
expectedEndpoints: sets.NewString("10.0.0.0:80"),
clusterEndpoints: nil,
localEndpoints: sets.NewString("10.0.0.0:80"),
}, {
name: "Cluster traffic policy, endpoints not Ready",
serviceInfo: &BaseServiceInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false},
},
clusterEndpoints: sets.NewString(),
localEndpoints: nil,
}, {
name: "Cluster traffic policy, some endpoints are Ready",
serviceInfo: &BaseServiceInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true},
},
clusterEndpoints: sets.NewString("10.0.0.1:80"),
localEndpoints: nil,
}, {
name: "Cluster traffic policy, PTE enabled, all endpoints are terminating",
pteEnabled: true,
serviceInfo: &BaseServiceInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: true},
},
clusterEndpoints: sets.NewString(),
localEndpoints: nil,
}, {
name: "iTP: Local, eTP: Cluster, some endpoints local",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: false, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
localEndpoints: sets.NewString("10.0.0.0:80"),
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "iTP: Cluster, eTP: Local, some endpoints local",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
localEndpoints: sets.NewString("10.0.0.0:80"),
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "iTP: Local, eTP: Local, some endpoints local",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
localEndpoints: sets.NewString("10.0.0.0:80"),
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "iTP: Local, eTP: Local, all endpoints remote",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
localEndpoints: sets.NewString(),
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "iTP: Local, eTP: Local, PTE disabled, all endpoints remote and terminating",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
},
clusterEndpoints: sets.NewString(),
localEndpoints: sets.NewString(),
allEndpoints: sets.NewString(),
}, {
name: "iTP: Local, eTP: Local, PTE enabled, all endpoints remote and terminating",
pteEnabled: true,
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
},
clusterEndpoints: sets.NewString(),
localEndpoints: sets.NewString(),
allEndpoints: sets.NewString(),
onlyRemoteEndpoints: true,
}, {
name: "iTP: Cluster, eTP: Local, PTE disabled, with terminating endpoints",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: false, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.2:80", Ready: false, Serving: true, Terminating: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.3:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80"),
localEndpoints: sets.NewString(),
allEndpoints: sets.NewString("10.0.0.0:80"),
}, {
name: "iTP: Cluster, eTP: Local, PTE enabled, with terminating endpoints",
pteEnabled: true,
serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: false, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.2:80", Ready: false, Serving: true, Terminating: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.3:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80"),
localEndpoints: sets.NewString("10.0.0.2:80"),
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.2:80"),
}, {
name: "externalTrafficPolicy ignored if not externally accessible",
serviceInfo: &BaseServiceInfo{nodeLocalExternal: true},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true},
},
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
localEndpoints: nil,
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "no cluster endpoints for iTP:Local internal-only service",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true},
},
clusterEndpoints: nil,
localEndpoints: sets.NewString("10.0.0.1:80"),
allEndpoints: sets.NewString("10.0.0.1:80"),
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TopologyAwareHints, tc.hintsEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, tc.pteEnabled)()
filteredEndpoints := FilterEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeLabels)
err := checkExpectedEndpoints(tc.expectedEndpoints, filteredEndpoints)
clusterEndpoints, localEndpoints, allEndpoints, hasAnyEndpoints := CategorizeEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeLabels)
if tc.clusterEndpoints == nil && clusterEndpoints != nil {
t.Errorf("expected no cluster endpoints but got %v", clusterEndpoints)
} else {
err := checkExpectedEndpoints(tc.clusterEndpoints, clusterEndpoints)
if err != nil {
t.Errorf("error with cluster endpoints: %v", err)
}
}
if tc.localEndpoints == nil && localEndpoints != nil {
t.Errorf("expected no local endpoints but got %v", localEndpoints)
} else {
err := checkExpectedEndpoints(tc.localEndpoints, localEndpoints)
if err != nil {
t.Errorf("error with local endpoints: %v", err)
}
}
var expectedAllEndpoints sets.String
if tc.clusterEndpoints != nil && tc.localEndpoints == nil {
expectedAllEndpoints = tc.clusterEndpoints
} else if tc.localEndpoints != nil && tc.clusterEndpoints == nil {
expectedAllEndpoints = tc.localEndpoints
} else {
expectedAllEndpoints = tc.allEndpoints
}
err := checkExpectedEndpoints(expectedAllEndpoints, allEndpoints)
if err != nil {
t.Errorf(err.Error())
t.Errorf("error with allEndpoints: %v", err)
}
expectedHasAnyEndpoints := len(expectedAllEndpoints) > 0 || tc.onlyRemoteEndpoints
if expectedHasAnyEndpoints != hasAnyEndpoints {
t.Errorf("expected hasAnyEndpoints=%v, got %v", expectedHasAnyEndpoints, hasAnyEndpoints)
}
})
}

View File

@ -91,6 +91,15 @@ type ServicePort interface {
InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicyType
// HintsAnnotation returns the value of the v1.AnnotationTopologyAwareHints annotation.
HintsAnnotation() string
// ExternallyAccessible returns true if the service port is reachable via something
// other than ClusterIP (NodePort/ExternalIP/LoadBalancer)
ExternallyAccessible() bool
// UsesClusterEndpoints returns true if the service port ever sends traffic to
// endpoints based on "Cluster" traffic policy
UsesClusterEndpoints() bool
// UsesLocalEndpoints returns true if the service port ever sends traffic to
// endpoints based on "Local" traffic policy
UsesLocalEndpoints() bool
}
// Endpoint in an interface which abstracts information about an endpoint.