diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index aebb5843136..2a7449ac685 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -38,11 +38,9 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/events" utilsysctl "k8s.io/component-helpers/node/util/sysctl" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metaproxier" @@ -694,14 +692,15 @@ const ( ) // servicePortChainName returns the name of the KUBE-SVC-XXXX chain for a service, which is the -// main iptables chain for that service. +// main iptables chain for that service, used for dispatching to endpoints when using `Cluster` +// traffic policy. func servicePortChainName(servicePortName string, protocol string) utiliptables.Chain { return utiliptables.Chain(servicePortChainNamePrefix + portProtoHash(servicePortName, protocol)) } // serviceLocalChainName returns the name of the KUBE-SVL-XXXX chain for a service, which // handles dispatching to local endpoints when using `Local` traffic policy. This chain only -// exists if the service has `Local` external traffic policy. +// exists if the service has `Local` internal or external traffic policy. func serviceLocalChainName(servicePortName string, protocol string) utiliptables.Chain { return utiliptables.Chain(serviceLocalChainNamePrefix + portProtoHash(servicePortName, protocol)) } @@ -963,15 +962,6 @@ func (proxier *Proxier) syncProxyRules() { // Accumulate NAT chains to keep. activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set - // We are creating those slices ones here to avoid memory reallocations - // in every loop. Note that reuse the memory, instead of doing: - // slice = - // you should always do one of the below: - // slice = slice[:0] // and then append to it - // slice = append(slice[:0], ...) - readyEndpointChains := make([]utiliptables.Chain, 0) - localEndpointChains := make([]utiliptables.Chain, 0) - // To avoid growing this slice, we arbitrarily set its size to 64, // there is never more than that many arguments for a single line. // Note that even if we go over 64, it will still be correct - it @@ -1012,36 +1002,14 @@ func (proxier *Proxier) syncProxyRules() { allEndpoints := proxier.endpointsMap[svcName] - // Filtering for topology aware endpoints. This function will only - // filter endpoints if appropriate feature gates are enabled and the - // Service does not have conflicting configuration such as - // externalTrafficPolicy=Local. - allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) - - // Scan the endpoints list to see what we have. "hasEndpoints" will be true - // if there are any usable endpoints for this service anywhere in the cluster. - var hasEndpoints, hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool - for _, ep := range allEndpoints { - if ep.IsReady() { - hasEndpoints = true - if ep.GetIsLocal() { - hasLocalReadyEndpoints = true - } - } else if svc.NodeLocalExternal() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) { - if ep.IsServing() && ep.IsTerminating() { - hasEndpoints = true - if ep.GetIsLocal() { - hasLocalServingTerminatingEndpoints = true - } - } - } - } - useTerminatingEndpoints := !hasLocalReadyEndpoints && hasLocalServingTerminatingEndpoints + // Figure out the endpoints for Cluster and Local traffic policy. + // allLocallyReachableEndpoints is the set of all endpoints that can be reached + // from this node, given the service's traffic policies. hasEndpoints is true + // if the service has any usable endpoints on any node, not just this one. + clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) // Generate the per-endpoint chains. - readyEndpointChains = readyEndpointChains[:0] - localEndpointChains = localEndpointChains[:0] - for _, ep := range allEndpoints { + for _, ep := range allLocallyReachableEndpoints { epInfo, ok := ep.(*endpointsInfo) if !ok { klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep) @@ -1049,27 +1017,6 @@ func (proxier *Proxier) syncProxyRules() { } endpointChain := epInfo.ChainName - endpointInUse := false - - if epInfo.Ready { - readyEndpointChains = append(readyEndpointChains, endpointChain) - endpointInUse = true - } - if svc.NodeLocalExternal() && epInfo.IsLocal { - if useTerminatingEndpoints { - if epInfo.Serving && epInfo.Terminating { - localEndpointChains = append(localEndpointChains, endpointChain) - endpointInUse = true - } - } else if epInfo.Ready { - localEndpointChains = append(localEndpointChains, endpointChain) - endpointInUse = true - } - } - - if !endpointInUse { - continue - } // Create the endpoint chain, retaining counters if possible. if chain, ok := existingNATChains[endpointChain]; ok { @@ -1096,8 +1043,21 @@ func (proxier *Proxier) syncProxyRules() { } svcChain := svcInfo.servicePortChainName - if hasEndpoints { - // Create the per-service chain, retaining counters if possible. + svcLocalChain := svcInfo.serviceLocalChainName + svcXlbChain := svcInfo.serviceLBChainName + + internalTrafficChain := svcChain + externalTrafficChain := svcChain + + if svcInfo.NodeLocalInternal() { + internalTrafficChain = svcLocalChain + } + if svcInfo.NodeLocalExternal() { + externalTrafficChain = svcXlbChain + } + + if hasEndpoints && svcInfo.UsesClusterEndpoints() { + // Create the Cluster traffic policy chain, retaining counters if possible. if chain, ok := existingNATChains[svcChain]; ok { proxier.natChains.WriteBytes(chain) } else { @@ -1105,18 +1065,8 @@ func (proxier *Proxier) syncProxyRules() { } activeNATChains[svcChain] = true } - externalTrafficChain := svcChain - svcLocalChain := svcInfo.serviceLocalChainName - svcXlbChain := svcInfo.serviceLBChainName - if hasEndpoints && svcInfo.NodeLocalExternal() { - // create the per-service LB and Local chains, retaining counters if possible. - if chain, ok := existingNATChains[svcLocalChain]; ok { - proxier.natChains.WriteBytes(chain) - } else { - proxier.natChains.Write(utiliptables.MakeChainLine(svcLocalChain)) - } - activeNATChains[svcLocalChain] = true + if hasEndpoints && svcInfo.ExternallyAccessible() && svcInfo.NodeLocalExternal() { if chain, ok := existingNATChains[svcXlbChain]; ok { proxier.natChains.WriteBytes(chain) } else { @@ -1157,8 +1107,15 @@ func (proxier *Proxier) syncProxyRules() { proxier.natRules.Write( "-A", string(svcXlbChain), "-j", string(svcLocalChain)) + } - externalTrafficChain = svcXlbChain + if hasEndpoints && svcInfo.UsesLocalEndpoints() { + if chain, ok := existingNATChains[svcLocalChain]; ok { + proxier.natChains.WriteBytes(chain) + } else { + proxier.natChains.Write(utiliptables.MakeChainLine(svcLocalChain)) + } + activeNATChains[svcLocalChain] = true } // Capture the clusterIP. @@ -1171,7 +1128,7 @@ func (proxier *Proxier) syncProxyRules() { ) if proxier.masqueradeAll { proxier.natRules.Write( - "-A", string(svcChain), + "-A", string(internalTrafficChain), args, "-j", string(KubeMarkMasqChain)) } else if proxier.localDetector.IsImplemented() { @@ -1181,7 +1138,7 @@ func (proxier *Proxier) syncProxyRules() { // for you. Since that might bounce off-node, we masquerade here. // If/when we support "Local" policy for VIPs, we should update this. proxier.natRules.Write( - "-A", string(svcChain), + "-A", string(internalTrafficChain), args, proxier.localDetector.IfNotLocal(), "-j", string(KubeMarkMasqChain)) @@ -1189,7 +1146,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.natRules.Write( "-A", string(kubeServicesChain), args, - "-j", string(svcChain)) + "-j", string(internalTrafficChain)) } else { // No endpoints. proxier.filterRules.Write( @@ -1333,7 +1290,6 @@ func (proxier *Proxier) syncProxyRules() { // worthwhile to make a new per-service chain for nodeport rules, but // with just 2 rules it ends up being a waste and a cognitive burden. if svcInfo.NodePort() != 0 && len(nodeAddresses) != 0 { - if hasEndpoints { args = append(args[:0], "-m", "comment", "--comment", svcNameString, @@ -1391,20 +1347,15 @@ func (proxier *Proxier) syncProxyRules() { ) } - if !hasEndpoints { - continue + if len(clusterEndpoints) != 0 { + // Write rules jumping from svcChain to clusterEndpoints + proxier.writeServiceToEndpointRules(svcNameString, svcInfo, svcChain, clusterEndpoints, args) } - // Write rules jumping from svcChain to readyEndpointChains - proxier.writeServiceToEndpointRules(svcNameString, svcInfo, svcChain, readyEndpointChains, args) - - // The logic below this applies only if this service is marked as OnlyLocal - if !svcInfo.NodeLocalExternal() { - continue - } - - numLocalEndpoints := len(localEndpointChains) - if numLocalEndpoints == 0 { + if len(localEndpoints) != 0 { + // Write rules jumping from svcLocalChain to localEndpointChains + proxier.writeServiceToEndpointRules(svcNameString, svcInfo, svcLocalChain, localEndpoints, args) + } else if hasEndpoints && svcInfo.UsesLocalEndpoints() { // Blackhole all traffic since there are no local endpoints args = append(args[:0], "-A", string(svcLocalChain), @@ -1414,9 +1365,6 @@ func (proxier *Proxier) syncProxyRules() { string(KubeMarkDropChain), ) proxier.natRules.Write(args) - } else { - // Write rules jumping from svcLocalChain to localEndpointChains - proxier.writeServiceToEndpointRules(svcNameString, svcInfo, svcLocalChain, localEndpointChains, args) } } @@ -1575,27 +1523,35 @@ func (proxier *Proxier) syncProxyRules() { proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } -func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpointChains []utiliptables.Chain, args []string) { +func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) { // First write session affinity rules, if applicable. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { - for _, endpointChain := range endpointChains { + for _, ep := range endpoints { + epInfo, ok := ep.(*endpointsInfo) + if !ok { + continue + } args = append(args[:0], "-A", string(svcChain), ) args = proxier.appendServiceCommentLocked(args, svcNameString) args = append(args, - "-m", "recent", "--name", string(endpointChain), + "-m", "recent", "--name", string(epInfo.ChainName), "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap", - "-j", string(endpointChain), + "-j", string(epInfo.ChainName), ) proxier.natRules.Write(args) } } // Now write loadbalancing rules. - numEndpoints := len(endpointChains) - for i, endpointChain := range endpointChains { - // Balancing rules in the per-service chain. + numEndpoints := len(endpoints) + for i, ep := range endpoints { + epInfo, ok := ep.(*endpointsInfo) + if !ok { + continue + } + args = append(args[:0], "-A", string(svcChain)) args = proxier.appendServiceCommentLocked(args, svcNameString) if i < (numEndpoints - 1) { @@ -1606,6 +1562,6 @@ func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInf "--probability", proxier.probability(numEndpoints-i)) } // The final (or only if n == 1) rule is a guaranteed match. - proxier.natRules.Write(args, "-j", string(endpointChain)) + proxier.natRules.Write(args, "-j", string(epInfo.ChainName)) } } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 5e83fb5ed19..543abcd24f5 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -4432,15 +4432,15 @@ COMMIT :KUBE-NODEPORTS - [0:0] :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] -:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0] :KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0] -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE -A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 --A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ --A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP --A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVL-AQI2S6QIMU7PVVRP +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-3JOIVZTXZZRGORX4 -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1 -j KUBE-MARK-MASQ -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS @@ -4463,7 +4463,6 @@ COMMIT :KUBE-EXTERNAL-SERVICES - [0:0] :KUBE-FORWARD - [0:0] :KUBE-NODEPORTS - [0:0] --A KUBE-SERVICES -m comment --comment "ns1/svc1 has no endpoints" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j REJECT -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT @@ -4473,10 +4472,14 @@ COMMIT :KUBE-NODEPORTS - [0:0] :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] +:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0] -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE -A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVL-AQI2S6QIMU7PVVRP +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT `, diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index e424fe6e499..a6aeac3c0b2 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1951,14 +1951,6 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode // curEndpoints represents IPVS destinations listed from current system. curEndpoints := sets.NewString() - // readyEndpoints represents Endpoints watched from API Server. - readyEndpoints := sets.NewString() - // localReadyEndpoints represents local endpoints that are ready and NOT terminating. - localReadyEndpoints := sets.NewString() - // localReadyTerminatingEndpoints represents local endpoints that are ready AND terminating. - // Fall back to these endpoints if no non-terminating ready endpoints exist for node-local traffic. - localReadyTerminatingEndpoints := sets.NewString() - curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer) if err != nil { klog.ErrorS(err, "Failed to list IPVS destinations") @@ -1978,32 +1970,17 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode if !ok { klog.InfoS("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName) } else { - endpoints = proxy.FilterEndpoints(endpoints, svcInfo, proxier.nodeLabels) + clusterEndpoints, localEndpoints, _, _ := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels) + if onlyNodeLocalEndpoints { + endpoints = localEndpoints + } else { + endpoints = clusterEndpoints + } } + newEndpoints := sets.NewString() for _, epInfo := range endpoints { - if epInfo.IsReady() { - readyEndpoints.Insert(epInfo.String()) - } - - if onlyNodeLocalEndpoints && epInfo.GetIsLocal() { - if epInfo.IsReady() { - localReadyEndpoints.Insert(epInfo.String()) - } else if epInfo.IsServing() && epInfo.IsTerminating() { - localReadyTerminatingEndpoints.Insert(epInfo.String()) - } - } - } - - newEndpoints := readyEndpoints - if onlyNodeLocalEndpoints { - newEndpoints = localReadyEndpoints - - if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) { - if len(newEndpoints) == 0 && localReadyTerminatingEndpoints.Len() > 0 { - newEndpoints = localReadyTerminatingEndpoints - } - } + newEndpoints.Insert(epInfo.String()) } // Create new endpoints diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index 1d149adf149..71b141a3376 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -139,6 +139,24 @@ func (info *BaseServiceInfo) HintsAnnotation() string { return info.hintsAnnotation } +// ExternallyAccessible is part of ServicePort interface. +func (info *BaseServiceInfo) ExternallyAccessible() bool { + return info.nodePort != 0 || len(info.loadBalancerStatus.Ingress) != 0 || len(info.externalIPs) != 0 +} + +// UsesClusterEndpoints is part of ServicePort interface. +func (info *BaseServiceInfo) UsesClusterEndpoints() bool { + // The service port uses Cluster endpoints if the internal traffic policy is "Cluster", + // or if it accepts external traffic at all. (Even if the external traffic policy is + // "Local", we need Cluster endpoints to implement short circuiting.) + return !info.nodeLocalInternal || info.ExternallyAccessible() +} + +// UsesLocalEndpoints is part of ServicePort interface. +func (info *BaseServiceInfo) UsesLocalEndpoints() bool { + return info.nodeLocalInternal || (info.nodeLocalExternal && info.ExternallyAccessible()) +} + func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo { nodeLocalExternal := false if apiservice.RequestsOnlyLocalTraffic(service) { diff --git a/pkg/proxy/topology.go b/pkg/proxy/topology.go index ae7274c5b25..ef16d559b1c 100644 --- a/pkg/proxy/topology.go +++ b/pkg/proxy/topology.go @@ -23,78 +23,165 @@ import ( "k8s.io/kubernetes/pkg/features" ) -// FilterEndpoints filters endpoints based on Service configuration, node -// labels, and enabled feature gates. This is primarily used to enable topology -// aware routing. -func FilterEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) []Endpoint { - if svcInfo.NodeLocalExternal() { - return endpoints +// CategorizeEndpoints returns: +// +// - The service's usable Cluster-traffic-policy endpoints (taking topology into account, if +// relevant). This will be nil if the service does not ever use Cluster traffic policy. +// +// - The service's usable Local-traffic-policy endpoints (including terminating endpoints, if +// relevant). This will be nil if the service does not ever use Local traffic policy. +// +// - The combined list of all endpoints reachable from this node (which is the union of the +// previous two lists, but in the case where it is identical to one or the other, we avoid +// allocating a separate list). +// +// - An indication of whether the service has any endpoints reachable from anywhere in the +// cluster. (This may be true even if allReachableEndpoints is empty.) +func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) { + var useTopology, useServingTerminatingEndpoints bool + + if svcInfo.UsesClusterEndpoints() { + useTopology = canUseTopology(endpoints, svcInfo, nodeLabels) + clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool { + if !ep.IsReady() { + return false + } + if useTopology && !availableForTopology(ep, nodeLabels) { + return false + } + return true + }) + + // If there are any Ready endpoints anywhere in the cluster, we are + // guaranteed to get one in clusterEndpoints. + if len(clusterEndpoints) > 0 { + hasAnyEndpoints = true + } } - if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() { - return FilterLocalEndpoint(endpoints) + if !svcInfo.UsesLocalEndpoints() { + allReachableEndpoints = clusterEndpoints + return } - if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { - return filterEndpointsWithHints(endpoints, svcInfo.HintsAnnotation(), nodeLabels) + // Pre-scan the endpoints, to figure out which type of endpoint Local + // traffic policy will use, and also to see if there are any usable + // endpoints anywhere in the cluster. + var hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool + for _, ep := range endpoints { + if ep.IsReady() { + hasAnyEndpoints = true + if ep.GetIsLocal() { + hasLocalReadyEndpoints = true + } + } else if ep.IsServing() && ep.IsTerminating() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) { + hasAnyEndpoints = true + if ep.GetIsLocal() { + hasLocalServingTerminatingEndpoints = true + } + } } - return endpoints + if hasLocalReadyEndpoints { + localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool { + return ep.GetIsLocal() && ep.IsReady() + }) + } else if hasLocalServingTerminatingEndpoints { + useServingTerminatingEndpoints = true + localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool { + return ep.GetIsLocal() && ep.IsServing() && ep.IsTerminating() + }) + } + + if !svcInfo.UsesClusterEndpoints() { + allReachableEndpoints = localEndpoints + return + } + + if !useTopology && !useServingTerminatingEndpoints { + // !useServingTerminatingEndpoints means that localEndpoints contains only + // Ready endpoints. !useTopology means that clusterEndpoints contains *every* + // Ready endpoint. So clusterEndpoints must be a superset of localEndpoints. + allReachableEndpoints = clusterEndpoints + return + } + + // clusterEndpoints may contain remote endpoints that aren't in localEndpoints, while + // localEndpoints may contain terminating or topologically-unavailable local endpoints + // that aren't in clusterEndpoints. So we have to merge the two lists. + endpointsMap := make(map[string]Endpoint, len(clusterEndpoints)+len(localEndpoints)) + for _, ep := range clusterEndpoints { + endpointsMap[ep.String()] = ep + } + for _, ep := range localEndpoints { + endpointsMap[ep.String()] = ep + } + allReachableEndpoints = make([]Endpoint, 0, len(endpointsMap)) + for _, ep := range endpointsMap { + allReachableEndpoints = append(allReachableEndpoints, ep) + } + + return } -// filterEndpointsWithHints provides filtering based on the hints included in -// EndpointSlices. If any of the following are true, the full list of endpoints -// will be returned without any filtering: -// * The AnnotationTopologyAwareHints annotation is not set to "Auto" for this -// Service. -// * No zone is specified in node labels. -// * No endpoints for this Service have a hint pointing to the zone this -// instance of kube-proxy is running in. -// * One or more endpoints for this Service do not have hints specified. -func filterEndpointsWithHints(endpoints []Endpoint, hintsAnnotation string, nodeLabels map[string]string) []Endpoint { +// canUseTopology returns true if topology aware routing is enabled and properly configured +// in this cluster. That is, it checks that: +// * The TopologyAwareHints feature is enabled +// * The "service.kubernetes.io/topology-aware-hints" annotation on this Service is set to "Auto" +// * The node's labels include "topology.kubernetes.io/zone" +// * All of the endpoints for this Service have a topology hint +// * At least one endpoint for this Service is hinted for this node's zone. +func canUseTopology(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) bool { + hintsAnnotation := svcInfo.HintsAnnotation() if hintsAnnotation != "Auto" && hintsAnnotation != "auto" { if hintsAnnotation != "" && hintsAnnotation != "Disabled" && hintsAnnotation != "disabled" { klog.InfoS("Skipping topology aware endpoint filtering since Service has unexpected value", "annotationTopologyAwareHints", v1.AnnotationTopologyAwareHints, "hints", hintsAnnotation) } - return endpoints + return false } zone, ok := nodeLabels[v1.LabelTopologyZone] if !ok || zone == "" { klog.InfoS("Skipping topology aware endpoint filtering since node is missing label", "label", v1.LabelTopologyZone) - return endpoints + return false } - filteredEndpoints := []Endpoint{} - + hasEndpointForZone := false for _, endpoint := range endpoints { if !endpoint.IsReady() { continue } if endpoint.GetZoneHints().Len() == 0 { klog.InfoS("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint") - return endpoints + return false } + if endpoint.GetZoneHints().Has(zone) { - filteredEndpoints = append(filteredEndpoints, endpoint) + hasEndpointForZone = true } } - if len(filteredEndpoints) == 0 { + if !hasEndpointForZone { klog.InfoS("Skipping topology aware endpoint filtering since no hints were provided for zone", "zone", zone) - return endpoints + return false } - return filteredEndpoints + return true } -// FilterLocalEndpoint returns the node local endpoints -func FilterLocalEndpoint(endpoints []Endpoint) []Endpoint { - var filteredEndpoints []Endpoint +// availableForTopology checks if this endpoint is available for use on this node, given +// topology constraints. (It assumes that canUseTopology() returned true.) +func availableForTopology(endpoint Endpoint, nodeLabels map[string]string) bool { + zone := nodeLabels[v1.LabelTopologyZone] + return endpoint.GetZoneHints().Has(zone) +} + +// filterEndpoints filters endpoints according to predicate +func filterEndpoints(endpoints []Endpoint, predicate func(Endpoint) bool) []Endpoint { + filteredEndpoints := make([]Endpoint, 0, len(endpoints)) - // Get all the local endpoints for _, ep := range endpoints { - if ep.GetIsLocal() { + if predicate(ep) { filteredEndpoints = append(filteredEndpoints, ep) } } diff --git a/pkg/proxy/topology_test.go b/pkg/proxy/topology_test.go index a151fc2b182..35c110815a2 100644 --- a/pkg/proxy/topology_test.go +++ b/pkg/proxy/topology_test.go @@ -45,74 +45,105 @@ func checkExpectedEndpoints(expected sets.String, actual []Endpoint) error { return kerrors.NewAggregate(errs) } -func TestFilterEndpoints(t *testing.T) { +func TestCategorizeEndpoints(t *testing.T) { testCases := []struct { - name string - hintsEnabled bool - nodeLabels map[string]string - serviceInfo ServicePort - endpoints []Endpoint - expectedEndpoints sets.String + name string + hintsEnabled bool + pteEnabled bool + nodeLabels map[string]string + serviceInfo ServicePort + endpoints []Endpoint + + // We distinguish `nil` ("service doesn't use this kind of endpoints") from + // `sets.String()` ("service uses this kind of endpoints but has no endpoints"). + // allEndpoints can be left unset if only one of clusterEndpoints and + // localEndpoints is set, and allEndpoints is identical to it. + // onlyRemoteEndpoints should be true if CategorizeEndpoints returns true for + // hasAnyEndpoints despite allEndpoints being empty. + clusterEndpoints sets.String + localEndpoints sets.String + allEndpoints sets.String + onlyRemoteEndpoints bool }{{ name: "hints enabled, hints annotation == auto", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "auto"}, + serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "hints, hints annotation == disabled, hints ignored", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "disabled"}, + serviceInfo: &BaseServiceInfo{hintsAnnotation: "disabled"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "hints, hints annotation == aUto (wrong capitalization), hints ignored", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "aUto"}, + serviceInfo: &BaseServiceInfo{hintsAnnotation: "aUto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "hints, hints annotation empty, hints ignored", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{nodeLocalExternal: false}, + serviceInfo: &BaseServiceInfo{}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, }, { - name: "node local endpoints, hints are ignored", + name: "externalTrafficPolicy: Local, topology ignored for Local endpoints", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{nodeLocalExternal: true, hintsAnnotation: "auto"}, + serviceInfo: &BaseServiceInfo{nodeLocalExternal: true, nodePort: 8080, hintsAnnotation: "auto"}, endpoints: []Endpoint{ - &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, - &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, + &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + localEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80"), + allEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"), + }, { + name: "internalTrafficPolicy: Local, topology ignored for Local endpoints", + hintsEnabled: true, + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, hintsAnnotation: "auto", nodeLocalExternal: false, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, + &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + }, + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + localEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80"), + allEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"), }, { name: "empty node labels", hintsEnabled: true, @@ -121,7 +152,8 @@ func TestFilterEndpoints(t *testing.T) { endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80"), + localEndpoints: nil, }, { name: "empty zone label", hintsEnabled: true, @@ -130,7 +162,8 @@ func TestFilterEndpoints(t *testing.T) { endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80"), + localEndpoints: nil, }, { name: "node in different zone, no endpoint filtering", hintsEnabled: true, @@ -139,7 +172,8 @@ func TestFilterEndpoints(t *testing.T) { endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80"), + localEndpoints: nil, }, { name: "normal endpoint filtering, auto annotation", hintsEnabled: true, @@ -151,7 +185,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "unready endpoint", hintsEnabled: true, @@ -163,7 +198,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: false}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80"), + localEndpoints: nil, }, { name: "only unready endpoints in same zone (should not filter)", hintsEnabled: true, @@ -175,7 +211,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: false}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.4:80", "10.1.2.5:80"), + localEndpoints: nil, }, { name: "normal endpoint filtering, Auto annotation", hintsEnabled: true, @@ -187,7 +224,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "hintsAnnotation empty, no filtering applied", hintsEnabled: true, @@ -199,7 +237,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "hintsAnnotation disabled, no filtering applied", hintsEnabled: true, @@ -211,7 +250,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "missing hints, no filtering applied", hintsEnabled: true, @@ -223,7 +263,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: nil, Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "multiple hints per endpoint, filtering includes any endpoint with zone included", hintsEnabled: true, @@ -235,12 +276,28 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-b", "zone-d"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"), + localEndpoints: nil, }, { - name: "internalTrafficPolicy: Local, with empty endpoints", - serviceInfo: &BaseServiceInfo{nodeLocalInternal: true}, - endpoints: []Endpoint{}, - expectedEndpoints: nil, + name: "conflicting topology and localness require merging allEndpoints", + hintsEnabled: true, + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080, hintsAnnotation: "auto"}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.2:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.3:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.2:80"), + localEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80", "10.0.0.2:80"), + }, { + name: "internalTrafficPolicy: Local, with empty endpoints", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true}, + endpoints: []Endpoint{}, + clusterEndpoints: nil, + localEndpoints: sets.NewString(), }, { name: "internalTrafficPolicy: Local, but all endpoints are remote", serviceInfo: &BaseServiceInfo{nodeLocalInternal: true}, @@ -248,7 +305,9 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, }, - expectedEndpoints: nil, + clusterEndpoints: nil, + localEndpoints: sets.NewString(), + onlyRemoteEndpoints: true, }, { name: "internalTrafficPolicy: Local, all endpoints are local", serviceInfo: &BaseServiceInfo{nodeLocalInternal: true}, @@ -256,7 +315,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true}, }, - expectedEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + clusterEndpoints: nil, + localEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), }, { name: "internalTrafficPolicy: Local, some endpoints are local", serviceInfo: &BaseServiceInfo{nodeLocalInternal: true}, @@ -264,17 +324,186 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, }, - expectedEndpoints: sets.NewString("10.0.0.0:80"), + clusterEndpoints: nil, + localEndpoints: sets.NewString("10.0.0.0:80"), + }, { + name: "Cluster traffic policy, endpoints not Ready", + serviceInfo: &BaseServiceInfo{}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false}, + }, + clusterEndpoints: sets.NewString(), + localEndpoints: nil, + }, { + name: "Cluster traffic policy, some endpoints are Ready", + serviceInfo: &BaseServiceInfo{}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true}, + }, + clusterEndpoints: sets.NewString("10.0.0.1:80"), + localEndpoints: nil, + }, { + name: "Cluster traffic policy, PTE enabled, all endpoints are terminating", + pteEnabled: true, + serviceInfo: &BaseServiceInfo{}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: true}, + }, + clusterEndpoints: sets.NewString(), + localEndpoints: nil, + }, { + name: "iTP: Local, eTP: Cluster, some endpoints local", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: false, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + localEndpoints: sets.NewString("10.0.0.0:80"), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + }, { + name: "iTP: Cluster, eTP: Local, some endpoints local", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + localEndpoints: sets.NewString("10.0.0.0:80"), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + }, { + name: "iTP: Local, eTP: Local, some endpoints local", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + localEndpoints: sets.NewString("10.0.0.0:80"), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + }, { + name: "iTP: Local, eTP: Local, all endpoints remote", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + localEndpoints: sets.NewString(), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + }, { + name: "iTP: Local, eTP: Local, PTE disabled, all endpoints remote and terminating", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString(), + localEndpoints: sets.NewString(), + allEndpoints: sets.NewString(), + }, { + name: "iTP: Local, eTP: Local, PTE enabled, all endpoints remote and terminating", + pteEnabled: true, + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString(), + localEndpoints: sets.NewString(), + allEndpoints: sets.NewString(), + onlyRemoteEndpoints: true, + }, { + name: "iTP: Cluster, eTP: Local, PTE disabled, with terminating endpoints", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: false, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.2:80", Ready: false, Serving: true, Terminating: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.3:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80"), + localEndpoints: sets.NewString(), + allEndpoints: sets.NewString("10.0.0.0:80"), + }, { + name: "iTP: Cluster, eTP: Local, PTE enabled, with terminating endpoints", + pteEnabled: true, + serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: false, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.2:80", Ready: false, Serving: true, Terminating: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.3:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80"), + localEndpoints: sets.NewString("10.0.0.2:80"), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.2:80"), + }, { + name: "externalTrafficPolicy ignored if not externally accessible", + serviceInfo: &BaseServiceInfo{nodeLocalExternal: true}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + localEndpoints: nil, + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + }, { + name: "no cluster endpoints for iTP:Local internal-only service", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true}, + }, + clusterEndpoints: nil, + localEndpoints: sets.NewString("10.0.0.1:80"), + allEndpoints: sets.NewString("10.0.0.1:80"), }} for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TopologyAwareHints, tc.hintsEnabled)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, tc.pteEnabled)() - filteredEndpoints := FilterEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeLabels) - err := checkExpectedEndpoints(tc.expectedEndpoints, filteredEndpoints) + clusterEndpoints, localEndpoints, allEndpoints, hasAnyEndpoints := CategorizeEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeLabels) + + if tc.clusterEndpoints == nil && clusterEndpoints != nil { + t.Errorf("expected no cluster endpoints but got %v", clusterEndpoints) + } else { + err := checkExpectedEndpoints(tc.clusterEndpoints, clusterEndpoints) + if err != nil { + t.Errorf("error with cluster endpoints: %v", err) + } + } + + if tc.localEndpoints == nil && localEndpoints != nil { + t.Errorf("expected no local endpoints but got %v", localEndpoints) + } else { + err := checkExpectedEndpoints(tc.localEndpoints, localEndpoints) + if err != nil { + t.Errorf("error with local endpoints: %v", err) + } + } + + var expectedAllEndpoints sets.String + if tc.clusterEndpoints != nil && tc.localEndpoints == nil { + expectedAllEndpoints = tc.clusterEndpoints + } else if tc.localEndpoints != nil && tc.clusterEndpoints == nil { + expectedAllEndpoints = tc.localEndpoints + } else { + expectedAllEndpoints = tc.allEndpoints + } + err := checkExpectedEndpoints(expectedAllEndpoints, allEndpoints) if err != nil { - t.Errorf(err.Error()) + t.Errorf("error with allEndpoints: %v", err) + } + + expectedHasAnyEndpoints := len(expectedAllEndpoints) > 0 || tc.onlyRemoteEndpoints + if expectedHasAnyEndpoints != hasAnyEndpoints { + t.Errorf("expected hasAnyEndpoints=%v, got %v", expectedHasAnyEndpoints, hasAnyEndpoints) } }) } diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index b7b6613f061..a68d36109a8 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -91,6 +91,15 @@ type ServicePort interface { InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicyType // HintsAnnotation returns the value of the v1.AnnotationTopologyAwareHints annotation. HintsAnnotation() string + // ExternallyAccessible returns true if the service port is reachable via something + // other than ClusterIP (NodePort/ExternalIP/LoadBalancer) + ExternallyAccessible() bool + // UsesClusterEndpoints returns true if the service port ever sends traffic to + // endpoints based on "Cluster" traffic policy + UsesClusterEndpoints() bool + // UsesLocalEndpoints returns true if the service port ever sends traffic to + // endpoints based on "Local" traffic policy + UsesLocalEndpoints() bool } // Endpoint in an interface which abstracts information about an endpoint.