proxy/iptables: fix internal-vs-external traffic policy handling

Fix internal and external traffic policy to be handled separately (so
that, in particular, services with Local internal traffic policy and
Cluster external traffic policy do not behave as though they had Local
external traffic policy as well.

Additionally, traffic to an `internalTrafficPolicy: Local` service on
a node with no endpoints is now dropped rather than being rejected
(which, as in the external case, may prevent traffic from being lost
when endpoints are in flux).
This commit is contained in:
Dan Winship 2021-11-16 10:49:55 -05:00
parent 2e780ecd99
commit 548cf9d5de
7 changed files with 494 additions and 215 deletions

View File

@ -38,11 +38,9 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/events"
utilsysctl "k8s.io/component-helpers/node/util/sysctl"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/proxy"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/metaproxier"
@ -694,14 +692,15 @@ const (
)
// servicePortChainName returns the name of the KUBE-SVC-XXXX chain for a service, which is the
// main iptables chain for that service.
// main iptables chain for that service, used for dispatching to endpoints when using `Cluster`
// traffic policy.
func servicePortChainName(servicePortName string, protocol string) utiliptables.Chain {
return utiliptables.Chain(servicePortChainNamePrefix + portProtoHash(servicePortName, protocol))
}
// serviceLocalChainName returns the name of the KUBE-SVL-XXXX chain for a service, which
// handles dispatching to local endpoints when using `Local` traffic policy. This chain only
// exists if the service has `Local` external traffic policy.
// exists if the service has `Local` internal or external traffic policy.
func serviceLocalChainName(servicePortName string, protocol string) utiliptables.Chain {
return utiliptables.Chain(serviceLocalChainNamePrefix + portProtoHash(servicePortName, protocol))
}
@ -963,15 +962,6 @@ func (proxier *Proxier) syncProxyRules() {
// Accumulate NAT chains to keep.
activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set
// We are creating those slices ones here to avoid memory reallocations
// in every loop. Note that reuse the memory, instead of doing:
// slice = <some new slice>
// you should always do one of the below:
// slice = slice[:0] // and then append to it
// slice = append(slice[:0], ...)
readyEndpointChains := make([]utiliptables.Chain, 0)
localEndpointChains := make([]utiliptables.Chain, 0)
// To avoid growing this slice, we arbitrarily set its size to 64,
// there is never more than that many arguments for a single line.
// Note that even if we go over 64, it will still be correct - it
@ -1012,36 +1002,14 @@ func (proxier *Proxier) syncProxyRules() {
allEndpoints := proxier.endpointsMap[svcName]
// Filtering for topology aware endpoints. This function will only
// filter endpoints if appropriate feature gates are enabled and the
// Service does not have conflicting configuration such as
// externalTrafficPolicy=Local.
allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
// Scan the endpoints list to see what we have. "hasEndpoints" will be true
// if there are any usable endpoints for this service anywhere in the cluster.
var hasEndpoints, hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
for _, ep := range allEndpoints {
if ep.IsReady() {
hasEndpoints = true
if ep.GetIsLocal() {
hasLocalReadyEndpoints = true
}
} else if svc.NodeLocalExternal() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
if ep.IsServing() && ep.IsTerminating() {
hasEndpoints = true
if ep.GetIsLocal() {
hasLocalServingTerminatingEndpoints = true
}
}
}
}
useTerminatingEndpoints := !hasLocalReadyEndpoints && hasLocalServingTerminatingEndpoints
// Figure out the endpoints for Cluster and Local traffic policy.
// allLocallyReachableEndpoints is the set of all endpoints that can be reached
// from this node, given the service's traffic policies. hasEndpoints is true
// if the service has any usable endpoints on any node, not just this one.
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
// Generate the per-endpoint chains.
readyEndpointChains = readyEndpointChains[:0]
localEndpointChains = localEndpointChains[:0]
for _, ep := range allEndpoints {
for _, ep := range allLocallyReachableEndpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep)
@ -1049,27 +1017,6 @@ func (proxier *Proxier) syncProxyRules() {
}
endpointChain := epInfo.ChainName
endpointInUse := false
if epInfo.Ready {
readyEndpointChains = append(readyEndpointChains, endpointChain)
endpointInUse = true
}
if svc.NodeLocalExternal() && epInfo.IsLocal {
if useTerminatingEndpoints {
if epInfo.Serving && epInfo.Terminating {
localEndpointChains = append(localEndpointChains, endpointChain)
endpointInUse = true
}
} else if epInfo.Ready {
localEndpointChains = append(localEndpointChains, endpointChain)
endpointInUse = true
}
}
if !endpointInUse {
continue
}
// Create the endpoint chain, retaining counters if possible.
if chain, ok := existingNATChains[endpointChain]; ok {
@ -1096,8 +1043,21 @@ func (proxier *Proxier) syncProxyRules() {
}
svcChain := svcInfo.servicePortChainName
if hasEndpoints {
// Create the per-service chain, retaining counters if possible.
svcLocalChain := svcInfo.serviceLocalChainName
svcXlbChain := svcInfo.serviceLBChainName
internalTrafficChain := svcChain
externalTrafficChain := svcChain
if svcInfo.NodeLocalInternal() {
internalTrafficChain = svcLocalChain
}
if svcInfo.NodeLocalExternal() {
externalTrafficChain = svcXlbChain
}
if hasEndpoints && svcInfo.UsesClusterEndpoints() {
// Create the Cluster traffic policy chain, retaining counters if possible.
if chain, ok := existingNATChains[svcChain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
@ -1105,18 +1065,8 @@ func (proxier *Proxier) syncProxyRules() {
}
activeNATChains[svcChain] = true
}
externalTrafficChain := svcChain
svcLocalChain := svcInfo.serviceLocalChainName
svcXlbChain := svcInfo.serviceLBChainName
if hasEndpoints && svcInfo.NodeLocalExternal() {
// create the per-service LB and Local chains, retaining counters if possible.
if chain, ok := existingNATChains[svcLocalChain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
proxier.natChains.Write(utiliptables.MakeChainLine(svcLocalChain))
}
activeNATChains[svcLocalChain] = true
if hasEndpoints && svcInfo.ExternallyAccessible() && svcInfo.NodeLocalExternal() {
if chain, ok := existingNATChains[svcXlbChain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
@ -1157,8 +1107,15 @@ func (proxier *Proxier) syncProxyRules() {
proxier.natRules.Write(
"-A", string(svcXlbChain),
"-j", string(svcLocalChain))
}
externalTrafficChain = svcXlbChain
if hasEndpoints && svcInfo.UsesLocalEndpoints() {
if chain, ok := existingNATChains[svcLocalChain]; ok {
proxier.natChains.WriteBytes(chain)
} else {
proxier.natChains.Write(utiliptables.MakeChainLine(svcLocalChain))
}
activeNATChains[svcLocalChain] = true
}
// Capture the clusterIP.
@ -1171,7 +1128,7 @@ func (proxier *Proxier) syncProxyRules() {
)
if proxier.masqueradeAll {
proxier.natRules.Write(
"-A", string(svcChain),
"-A", string(internalTrafficChain),
args,
"-j", string(KubeMarkMasqChain))
} else if proxier.localDetector.IsImplemented() {
@ -1181,7 +1138,7 @@ func (proxier *Proxier) syncProxyRules() {
// for you. Since that might bounce off-node, we masquerade here.
// If/when we support "Local" policy for VIPs, we should update this.
proxier.natRules.Write(
"-A", string(svcChain),
"-A", string(internalTrafficChain),
args,
proxier.localDetector.IfNotLocal(),
"-j", string(KubeMarkMasqChain))
@ -1189,7 +1146,7 @@ func (proxier *Proxier) syncProxyRules() {
proxier.natRules.Write(
"-A", string(kubeServicesChain),
args,
"-j", string(svcChain))
"-j", string(internalTrafficChain))
} else {
// No endpoints.
proxier.filterRules.Write(
@ -1333,7 +1290,6 @@ func (proxier *Proxier) syncProxyRules() {
// worthwhile to make a new per-service chain for nodeport rules, but
// with just 2 rules it ends up being a waste and a cognitive burden.
if svcInfo.NodePort() != 0 && len(nodeAddresses) != 0 {
if hasEndpoints {
args = append(args[:0],
"-m", "comment", "--comment", svcNameString,
@ -1391,20 +1347,15 @@ func (proxier *Proxier) syncProxyRules() {
)
}
if !hasEndpoints {
continue
if len(clusterEndpoints) != 0 {
// Write rules jumping from svcChain to clusterEndpoints
proxier.writeServiceToEndpointRules(svcNameString, svcInfo, svcChain, clusterEndpoints, args)
}
// Write rules jumping from svcChain to readyEndpointChains
proxier.writeServiceToEndpointRules(svcNameString, svcInfo, svcChain, readyEndpointChains, args)
// The logic below this applies only if this service is marked as OnlyLocal
if !svcInfo.NodeLocalExternal() {
continue
}
numLocalEndpoints := len(localEndpointChains)
if numLocalEndpoints == 0 {
if len(localEndpoints) != 0 {
// Write rules jumping from svcLocalChain to localEndpointChains
proxier.writeServiceToEndpointRules(svcNameString, svcInfo, svcLocalChain, localEndpoints, args)
} else if hasEndpoints && svcInfo.UsesLocalEndpoints() {
// Blackhole all traffic since there are no local endpoints
args = append(args[:0],
"-A", string(svcLocalChain),
@ -1414,9 +1365,6 @@ func (proxier *Proxier) syncProxyRules() {
string(KubeMarkDropChain),
)
proxier.natRules.Write(args)
} else {
// Write rules jumping from svcLocalChain to localEndpointChains
proxier.writeServiceToEndpointRules(svcNameString, svcInfo, svcLocalChain, localEndpointChains, args)
}
}
@ -1575,27 +1523,35 @@ func (proxier *Proxier) syncProxyRules() {
proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints)
}
func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpointChains []utiliptables.Chain, args []string) {
func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) {
// First write session affinity rules, if applicable.
if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP {
for _, endpointChain := range endpointChains {
for _, ep := range endpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
continue
}
args = append(args[:0],
"-A", string(svcChain),
)
args = proxier.appendServiceCommentLocked(args, svcNameString)
args = append(args,
"-m", "recent", "--name", string(endpointChain),
"-m", "recent", "--name", string(epInfo.ChainName),
"--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap",
"-j", string(endpointChain),
"-j", string(epInfo.ChainName),
)
proxier.natRules.Write(args)
}
}
// Now write loadbalancing rules.
numEndpoints := len(endpointChains)
for i, endpointChain := range endpointChains {
// Balancing rules in the per-service chain.
numEndpoints := len(endpoints)
for i, ep := range endpoints {
epInfo, ok := ep.(*endpointsInfo)
if !ok {
continue
}
args = append(args[:0], "-A", string(svcChain))
args = proxier.appendServiceCommentLocked(args, svcNameString)
if i < (numEndpoints - 1) {
@ -1606,6 +1562,6 @@ func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInf
"--probability", proxier.probability(numEndpoints-i))
}
// The final (or only if n == 1) rule is a guaranteed match.
proxier.natRules.Write(args, "-j", string(endpointChain))
proxier.natRules.Write(args, "-j", string(epInfo.ChainName))
}
}

View File

@ -4432,15 +4432,15 @@ COMMIT
:KUBE-NODEPORTS - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0]
:KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0]
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP
-A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVL-AQI2S6QIMU7PVVRP
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-3JOIVZTXZZRGORX4
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1 -j KUBE-MARK-MASQ
-A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
@ -4463,7 +4463,6 @@ COMMIT
:KUBE-EXTERNAL-SERVICES - [0:0]
:KUBE-FORWARD - [0:0]
:KUBE-NODEPORTS - [0:0]
-A KUBE-SERVICES -m comment --comment "ns1/svc1 has no endpoints" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j REJECT
-A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT
-A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT
@ -4473,10 +4472,14 @@ COMMIT
:KUBE-NODEPORTS - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0]
-A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN
-A KUBE-POSTROUTING -j MARK --xor-mark 0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE
-A KUBE-MARK-MASQ -j MARK --or-mark 0x4000
-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVL-AQI2S6QIMU7PVVRP
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ
-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP
-A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS
COMMIT
`,

View File

@ -1951,14 +1951,6 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
// curEndpoints represents IPVS destinations listed from current system.
curEndpoints := sets.NewString()
// readyEndpoints represents Endpoints watched from API Server.
readyEndpoints := sets.NewString()
// localReadyEndpoints represents local endpoints that are ready and NOT terminating.
localReadyEndpoints := sets.NewString()
// localReadyTerminatingEndpoints represents local endpoints that are ready AND terminating.
// Fall back to these endpoints if no non-terminating ready endpoints exist for node-local traffic.
localReadyTerminatingEndpoints := sets.NewString()
curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer)
if err != nil {
klog.ErrorS(err, "Failed to list IPVS destinations")
@ -1978,32 +1970,17 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
if !ok {
klog.InfoS("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName)
} else {
endpoints = proxy.FilterEndpoints(endpoints, svcInfo, proxier.nodeLabels)
clusterEndpoints, localEndpoints, _, _ := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels)
if onlyNodeLocalEndpoints {
endpoints = localEndpoints
} else {
endpoints = clusterEndpoints
}
}
newEndpoints := sets.NewString()
for _, epInfo := range endpoints {
if epInfo.IsReady() {
readyEndpoints.Insert(epInfo.String())
}
if onlyNodeLocalEndpoints && epInfo.GetIsLocal() {
if epInfo.IsReady() {
localReadyEndpoints.Insert(epInfo.String())
} else if epInfo.IsServing() && epInfo.IsTerminating() {
localReadyTerminatingEndpoints.Insert(epInfo.String())
}
}
}
newEndpoints := readyEndpoints
if onlyNodeLocalEndpoints {
newEndpoints = localReadyEndpoints
if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
if len(newEndpoints) == 0 && localReadyTerminatingEndpoints.Len() > 0 {
newEndpoints = localReadyTerminatingEndpoints
}
}
newEndpoints.Insert(epInfo.String())
}
// Create new endpoints

View File

@ -139,6 +139,24 @@ func (info *BaseServiceInfo) HintsAnnotation() string {
return info.hintsAnnotation
}
// ExternallyAccessible is part of ServicePort interface.
func (info *BaseServiceInfo) ExternallyAccessible() bool {
return info.nodePort != 0 || len(info.loadBalancerStatus.Ingress) != 0 || len(info.externalIPs) != 0
}
// UsesClusterEndpoints is part of ServicePort interface.
func (info *BaseServiceInfo) UsesClusterEndpoints() bool {
// The service port uses Cluster endpoints if the internal traffic policy is "Cluster",
// or if it accepts external traffic at all. (Even if the external traffic policy is
// "Local", we need Cluster endpoints to implement short circuiting.)
return !info.nodeLocalInternal || info.ExternallyAccessible()
}
// UsesLocalEndpoints is part of ServicePort interface.
func (info *BaseServiceInfo) UsesLocalEndpoints() bool {
return info.nodeLocalInternal || (info.nodeLocalExternal && info.ExternallyAccessible())
}
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo {
nodeLocalExternal := false
if apiservice.RequestsOnlyLocalTraffic(service) {

View File

@ -23,78 +23,165 @@ import (
"k8s.io/kubernetes/pkg/features"
)
// FilterEndpoints filters endpoints based on Service configuration, node
// labels, and enabled feature gates. This is primarily used to enable topology
// aware routing.
func FilterEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) []Endpoint {
if svcInfo.NodeLocalExternal() {
return endpoints
// CategorizeEndpoints returns:
//
// - The service's usable Cluster-traffic-policy endpoints (taking topology into account, if
// relevant). This will be nil if the service does not ever use Cluster traffic policy.
//
// - The service's usable Local-traffic-policy endpoints (including terminating endpoints, if
// relevant). This will be nil if the service does not ever use Local traffic policy.
//
// - The combined list of all endpoints reachable from this node (which is the union of the
// previous two lists, but in the case where it is identical to one or the other, we avoid
// allocating a separate list).
//
// - An indication of whether the service has any endpoints reachable from anywhere in the
// cluster. (This may be true even if allReachableEndpoints is empty.)
func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) {
var useTopology, useServingTerminatingEndpoints bool
if svcInfo.UsesClusterEndpoints() {
useTopology = canUseTopology(endpoints, svcInfo, nodeLabels)
clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
if !ep.IsReady() {
return false
}
if useTopology && !availableForTopology(ep, nodeLabels) {
return false
}
return true
})
// If there are any Ready endpoints anywhere in the cluster, we are
// guaranteed to get one in clusterEndpoints.
if len(clusterEndpoints) > 0 {
hasAnyEndpoints = true
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() {
return FilterLocalEndpoint(endpoints)
if !svcInfo.UsesLocalEndpoints() {
allReachableEndpoints = clusterEndpoints
return
}
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
return filterEndpointsWithHints(endpoints, svcInfo.HintsAnnotation(), nodeLabels)
// Pre-scan the endpoints, to figure out which type of endpoint Local
// traffic policy will use, and also to see if there are any usable
// endpoints anywhere in the cluster.
var hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool
for _, ep := range endpoints {
if ep.IsReady() {
hasAnyEndpoints = true
if ep.GetIsLocal() {
hasLocalReadyEndpoints = true
}
} else if ep.IsServing() && ep.IsTerminating() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) {
hasAnyEndpoints = true
if ep.GetIsLocal() {
hasLocalServingTerminatingEndpoints = true
}
}
}
return endpoints
if hasLocalReadyEndpoints {
localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
return ep.GetIsLocal() && ep.IsReady()
})
} else if hasLocalServingTerminatingEndpoints {
useServingTerminatingEndpoints = true
localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
return ep.GetIsLocal() && ep.IsServing() && ep.IsTerminating()
})
}
if !svcInfo.UsesClusterEndpoints() {
allReachableEndpoints = localEndpoints
return
}
if !useTopology && !useServingTerminatingEndpoints {
// !useServingTerminatingEndpoints means that localEndpoints contains only
// Ready endpoints. !useTopology means that clusterEndpoints contains *every*
// Ready endpoint. So clusterEndpoints must be a superset of localEndpoints.
allReachableEndpoints = clusterEndpoints
return
}
// clusterEndpoints may contain remote endpoints that aren't in localEndpoints, while
// localEndpoints may contain terminating or topologically-unavailable local endpoints
// that aren't in clusterEndpoints. So we have to merge the two lists.
endpointsMap := make(map[string]Endpoint, len(clusterEndpoints)+len(localEndpoints))
for _, ep := range clusterEndpoints {
endpointsMap[ep.String()] = ep
}
for _, ep := range localEndpoints {
endpointsMap[ep.String()] = ep
}
allReachableEndpoints = make([]Endpoint, 0, len(endpointsMap))
for _, ep := range endpointsMap {
allReachableEndpoints = append(allReachableEndpoints, ep)
}
return
}
// filterEndpointsWithHints provides filtering based on the hints included in
// EndpointSlices. If any of the following are true, the full list of endpoints
// will be returned without any filtering:
// * The AnnotationTopologyAwareHints annotation is not set to "Auto" for this
// Service.
// * No zone is specified in node labels.
// * No endpoints for this Service have a hint pointing to the zone this
// instance of kube-proxy is running in.
// * One or more endpoints for this Service do not have hints specified.
func filterEndpointsWithHints(endpoints []Endpoint, hintsAnnotation string, nodeLabels map[string]string) []Endpoint {
// canUseTopology returns true if topology aware routing is enabled and properly configured
// in this cluster. That is, it checks that:
// * The TopologyAwareHints feature is enabled
// * The "service.kubernetes.io/topology-aware-hints" annotation on this Service is set to "Auto"
// * The node's labels include "topology.kubernetes.io/zone"
// * All of the endpoints for this Service have a topology hint
// * At least one endpoint for this Service is hinted for this node's zone.
func canUseTopology(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) bool {
hintsAnnotation := svcInfo.HintsAnnotation()
if hintsAnnotation != "Auto" && hintsAnnotation != "auto" {
if hintsAnnotation != "" && hintsAnnotation != "Disabled" && hintsAnnotation != "disabled" {
klog.InfoS("Skipping topology aware endpoint filtering since Service has unexpected value", "annotationTopologyAwareHints", v1.AnnotationTopologyAwareHints, "hints", hintsAnnotation)
}
return endpoints
return false
}
zone, ok := nodeLabels[v1.LabelTopologyZone]
if !ok || zone == "" {
klog.InfoS("Skipping topology aware endpoint filtering since node is missing label", "label", v1.LabelTopologyZone)
return endpoints
return false
}
filteredEndpoints := []Endpoint{}
hasEndpointForZone := false
for _, endpoint := range endpoints {
if !endpoint.IsReady() {
continue
}
if endpoint.GetZoneHints().Len() == 0 {
klog.InfoS("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint")
return endpoints
return false
}
if endpoint.GetZoneHints().Has(zone) {
filteredEndpoints = append(filteredEndpoints, endpoint)
hasEndpointForZone = true
}
}
if len(filteredEndpoints) == 0 {
if !hasEndpointForZone {
klog.InfoS("Skipping topology aware endpoint filtering since no hints were provided for zone", "zone", zone)
return endpoints
return false
}
return filteredEndpoints
return true
}
// FilterLocalEndpoint returns the node local endpoints
func FilterLocalEndpoint(endpoints []Endpoint) []Endpoint {
var filteredEndpoints []Endpoint
// availableForTopology checks if this endpoint is available for use on this node, given
// topology constraints. (It assumes that canUseTopology() returned true.)
func availableForTopology(endpoint Endpoint, nodeLabels map[string]string) bool {
zone := nodeLabels[v1.LabelTopologyZone]
return endpoint.GetZoneHints().Has(zone)
}
// filterEndpoints filters endpoints according to predicate
func filterEndpoints(endpoints []Endpoint, predicate func(Endpoint) bool) []Endpoint {
filteredEndpoints := make([]Endpoint, 0, len(endpoints))
// Get all the local endpoints
for _, ep := range endpoints {
if ep.GetIsLocal() {
if predicate(ep) {
filteredEndpoints = append(filteredEndpoints, ep)
}
}

View File

@ -45,74 +45,105 @@ func checkExpectedEndpoints(expected sets.String, actual []Endpoint) error {
return kerrors.NewAggregate(errs)
}
func TestFilterEndpoints(t *testing.T) {
func TestCategorizeEndpoints(t *testing.T) {
testCases := []struct {
name string
hintsEnabled bool
nodeLabels map[string]string
serviceInfo ServicePort
endpoints []Endpoint
expectedEndpoints sets.String
name string
hintsEnabled bool
pteEnabled bool
nodeLabels map[string]string
serviceInfo ServicePort
endpoints []Endpoint
// We distinguish `nil` ("service doesn't use this kind of endpoints") from
// `sets.String()` ("service uses this kind of endpoints but has no endpoints").
// allEndpoints can be left unset if only one of clusterEndpoints and
// localEndpoints is set, and allEndpoints is identical to it.
// onlyRemoteEndpoints should be true if CategorizeEndpoints returns true for
// hasAnyEndpoints despite allEndpoints being empty.
clusterEndpoints sets.String
localEndpoints sets.String
allEndpoints sets.String
onlyRemoteEndpoints bool
}{{
name: "hints enabled, hints annotation == auto",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "auto"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "hints, hints annotation == disabled, hints ignored",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "disabled"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "disabled"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "hints, hints annotation == aUto (wrong capitalization), hints ignored",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "aUto"},
serviceInfo: &BaseServiceInfo{hintsAnnotation: "aUto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "hints, hints annotation empty, hints ignored",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: false},
serviceInfo: &BaseServiceInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "node local endpoints, hints are ignored",
name: "externalTrafficPolicy: Local, topology ignored for Local endpoints",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: true, hintsAnnotation: "auto"},
serviceInfo: &BaseServiceInfo{nodeLocalExternal: true, nodePort: 8080, hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80"),
allEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"),
}, {
name: "internalTrafficPolicy: Local, topology ignored for Local endpoints",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, hintsAnnotation: "auto", nodeLocalExternal: false, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80"),
allEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"),
}, {
name: "empty node labels",
hintsEnabled: true,
@ -121,7 +152,8 @@ func TestFilterEndpoints(t *testing.T) {
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80"),
localEndpoints: nil,
}, {
name: "empty zone label",
hintsEnabled: true,
@ -130,7 +162,8 @@ func TestFilterEndpoints(t *testing.T) {
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80"),
localEndpoints: nil,
}, {
name: "node in different zone, no endpoint filtering",
hintsEnabled: true,
@ -139,7 +172,8 @@ func TestFilterEndpoints(t *testing.T) {
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80"),
localEndpoints: nil,
}, {
name: "normal endpoint filtering, auto annotation",
hintsEnabled: true,
@ -151,7 +185,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "unready endpoint",
hintsEnabled: true,
@ -163,7 +198,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: false},
},
expectedEndpoints: sets.NewString("10.1.2.3:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80"),
localEndpoints: nil,
}, {
name: "only unready endpoints in same zone (should not filter)",
hintsEnabled: true,
@ -175,7 +211,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: false},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.4:80", "10.1.2.5:80"),
localEndpoints: nil,
}, {
name: "normal endpoint filtering, Auto annotation",
hintsEnabled: true,
@ -187,7 +224,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "hintsAnnotation empty, no filtering applied",
hintsEnabled: true,
@ -199,7 +237,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "hintsAnnotation disabled, no filtering applied",
hintsEnabled: true,
@ -211,7 +250,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "missing hints, no filtering applied",
hintsEnabled: true,
@ -223,7 +263,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: nil, Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "multiple hints per endpoint, filtering includes any endpoint with zone included",
hintsEnabled: true,
@ -235,12 +276,28 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-b", "zone-d"), Ready: true},
&BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-c"), Ready: true},
},
expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"),
clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "internalTrafficPolicy: Local, with empty endpoints",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true},
endpoints: []Endpoint{},
expectedEndpoints: nil,
name: "conflicting topology and localness require merging allEndpoints",
hintsEnabled: true,
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080, hintsAnnotation: "auto"},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.2:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.3:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.2:80"),
localEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80", "10.0.0.2:80"),
}, {
name: "internalTrafficPolicy: Local, with empty endpoints",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true},
endpoints: []Endpoint{},
clusterEndpoints: nil,
localEndpoints: sets.NewString(),
}, {
name: "internalTrafficPolicy: Local, but all endpoints are remote",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true},
@ -248,7 +305,9 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
},
expectedEndpoints: nil,
clusterEndpoints: nil,
localEndpoints: sets.NewString(),
onlyRemoteEndpoints: true,
}, {
name: "internalTrafficPolicy: Local, all endpoints are local",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true},
@ -256,7 +315,8 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true},
},
expectedEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
clusterEndpoints: nil,
localEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "internalTrafficPolicy: Local, some endpoints are local",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true},
@ -264,17 +324,186 @@ func TestFilterEndpoints(t *testing.T) {
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
},
expectedEndpoints: sets.NewString("10.0.0.0:80"),
clusterEndpoints: nil,
localEndpoints: sets.NewString("10.0.0.0:80"),
}, {
name: "Cluster traffic policy, endpoints not Ready",
serviceInfo: &BaseServiceInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false},
},
clusterEndpoints: sets.NewString(),
localEndpoints: nil,
}, {
name: "Cluster traffic policy, some endpoints are Ready",
serviceInfo: &BaseServiceInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true},
},
clusterEndpoints: sets.NewString("10.0.0.1:80"),
localEndpoints: nil,
}, {
name: "Cluster traffic policy, PTE enabled, all endpoints are terminating",
pteEnabled: true,
serviceInfo: &BaseServiceInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: true},
},
clusterEndpoints: sets.NewString(),
localEndpoints: nil,
}, {
name: "iTP: Local, eTP: Cluster, some endpoints local",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: false, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
localEndpoints: sets.NewString("10.0.0.0:80"),
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "iTP: Cluster, eTP: Local, some endpoints local",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
localEndpoints: sets.NewString("10.0.0.0:80"),
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "iTP: Local, eTP: Local, some endpoints local",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
localEndpoints: sets.NewString("10.0.0.0:80"),
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "iTP: Local, eTP: Local, all endpoints remote",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
localEndpoints: sets.NewString(),
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "iTP: Local, eTP: Local, PTE disabled, all endpoints remote and terminating",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
},
clusterEndpoints: sets.NewString(),
localEndpoints: sets.NewString(),
allEndpoints: sets.NewString(),
}, {
name: "iTP: Local, eTP: Local, PTE enabled, all endpoints remote and terminating",
pteEnabled: true,
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
},
clusterEndpoints: sets.NewString(),
localEndpoints: sets.NewString(),
allEndpoints: sets.NewString(),
onlyRemoteEndpoints: true,
}, {
name: "iTP: Cluster, eTP: Local, PTE disabled, with terminating endpoints",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: false, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.2:80", Ready: false, Serving: true, Terminating: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.3:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80"),
localEndpoints: sets.NewString(),
allEndpoints: sets.NewString("10.0.0.0:80"),
}, {
name: "iTP: Cluster, eTP: Local, PTE enabled, with terminating endpoints",
pteEnabled: true,
serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: false, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.2:80", Ready: false, Serving: true, Terminating: true, IsLocal: true},
&BaseEndpointInfo{Endpoint: "10.0.0.3:80", Ready: false, Serving: true, Terminating: true, IsLocal: false},
},
clusterEndpoints: sets.NewString("10.0.0.0:80"),
localEndpoints: sets.NewString("10.0.0.2:80"),
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.2:80"),
}, {
name: "externalTrafficPolicy ignored if not externally accessible",
serviceInfo: &BaseServiceInfo{nodeLocalExternal: true},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true},
},
clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
localEndpoints: nil,
allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"),
}, {
name: "no cluster endpoints for iTP:Local internal-only service",
serviceInfo: &BaseServiceInfo{nodeLocalInternal: true},
endpoints: []Endpoint{
&BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false},
&BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true},
},
clusterEndpoints: nil,
localEndpoints: sets.NewString("10.0.0.1:80"),
allEndpoints: sets.NewString("10.0.0.1:80"),
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TopologyAwareHints, tc.hintsEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, tc.pteEnabled)()
filteredEndpoints := FilterEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeLabels)
err := checkExpectedEndpoints(tc.expectedEndpoints, filteredEndpoints)
clusterEndpoints, localEndpoints, allEndpoints, hasAnyEndpoints := CategorizeEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeLabels)
if tc.clusterEndpoints == nil && clusterEndpoints != nil {
t.Errorf("expected no cluster endpoints but got %v", clusterEndpoints)
} else {
err := checkExpectedEndpoints(tc.clusterEndpoints, clusterEndpoints)
if err != nil {
t.Errorf("error with cluster endpoints: %v", err)
}
}
if tc.localEndpoints == nil && localEndpoints != nil {
t.Errorf("expected no local endpoints but got %v", localEndpoints)
} else {
err := checkExpectedEndpoints(tc.localEndpoints, localEndpoints)
if err != nil {
t.Errorf("error with local endpoints: %v", err)
}
}
var expectedAllEndpoints sets.String
if tc.clusterEndpoints != nil && tc.localEndpoints == nil {
expectedAllEndpoints = tc.clusterEndpoints
} else if tc.localEndpoints != nil && tc.clusterEndpoints == nil {
expectedAllEndpoints = tc.localEndpoints
} else {
expectedAllEndpoints = tc.allEndpoints
}
err := checkExpectedEndpoints(expectedAllEndpoints, allEndpoints)
if err != nil {
t.Errorf(err.Error())
t.Errorf("error with allEndpoints: %v", err)
}
expectedHasAnyEndpoints := len(expectedAllEndpoints) > 0 || tc.onlyRemoteEndpoints
if expectedHasAnyEndpoints != hasAnyEndpoints {
t.Errorf("expected hasAnyEndpoints=%v, got %v", expectedHasAnyEndpoints, hasAnyEndpoints)
}
})
}

View File

@ -91,6 +91,15 @@ type ServicePort interface {
InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicyType
// HintsAnnotation returns the value of the v1.AnnotationTopologyAwareHints annotation.
HintsAnnotation() string
// ExternallyAccessible returns true if the service port is reachable via something
// other than ClusterIP (NodePort/ExternalIP/LoadBalancer)
ExternallyAccessible() bool
// UsesClusterEndpoints returns true if the service port ever sends traffic to
// endpoints based on "Cluster" traffic policy
UsesClusterEndpoints() bool
// UsesLocalEndpoints returns true if the service port ever sends traffic to
// endpoints based on "Local" traffic policy
UsesLocalEndpoints() bool
}
// Endpoint in an interface which abstracts information about an endpoint.