diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index a7c673e2ff9..3d372a779a9 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -38,11 +38,9 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/events" utilsysctl "k8s.io/component-helpers/node/util/sysctl" "k8s.io/klog/v2" - "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/proxy" "k8s.io/kubernetes/pkg/proxy/healthcheck" "k8s.io/kubernetes/pkg/proxy/metaproxier" @@ -119,6 +117,7 @@ type serviceInfo struct { // The following fields are computed and stored for performance reasons. serviceNameString string servicePortChainName utiliptables.Chain + serviceLocalChainName utiliptables.Chain serviceFirewallChainName utiliptables.Chain serviceLBChainName utiliptables.Chain } @@ -133,6 +132,7 @@ func newServiceInfo(port *v1.ServicePort, service *v1.Service, baseInfo *proxy.B protocol := strings.ToLower(string(info.Protocol())) info.serviceNameString = svcPortName.String() info.servicePortChainName = servicePortChainName(info.serviceNameString, protocol) + info.serviceLocalChainName = serviceLocalChainName(info.serviceNameString, protocol) info.serviceFirewallChainName = serviceFirewallChainName(info.serviceNameString, protocol) info.serviceLBChainName = serviceLBChainName(info.serviceNameString, protocol) @@ -436,7 +436,7 @@ func CleanupLeftovers(ipt utiliptables.Interface) (encounteredError bool) { // Hunt for service and endpoint chains. for chain := range existingNATChains { chainString := string(chain) - if strings.HasPrefix(chainString, "KUBE-SVC-") || strings.HasPrefix(chainString, "KUBE-SEP-") || strings.HasPrefix(chainString, "KUBE-FW-") || strings.HasPrefix(chainString, "KUBE-XLB-") { + if isServiceChainName(chainString) { natChains.WriteBytes(existingNATChains[chain]) // flush natRules.Write("-X", chainString) // delete } @@ -683,34 +683,65 @@ func portProtoHash(servicePortName string, protocol string) string { return encoded[:16] } -// servicePortChainName takes the ServicePortName for a service and -// returns the associated iptables chain. This is computed by hashing (sha256) -// then encoding to base32 and truncating with the prefix "KUBE-SVC-". +const ( + servicePortChainNamePrefix = "KUBE-SVC-" + serviceLocalChainNamePrefix = "KUBE-SVL-" + serviceFirewallChainNamePrefix = "KUBE-FW-" + serviceLBChainNamePrefix = "KUBE-XLB-" + servicePortEndpointChainNamePrefix = "KUBE-SEP-" +) + +// servicePortChainName returns the name of the KUBE-SVC-XXXX chain for a service, which is the +// main iptables chain for that service, used for dispatching to endpoints when using `Cluster` +// traffic policy. func servicePortChainName(servicePortName string, protocol string) utiliptables.Chain { - return utiliptables.Chain("KUBE-SVC-" + portProtoHash(servicePortName, protocol)) + return utiliptables.Chain(servicePortChainNamePrefix + portProtoHash(servicePortName, protocol)) } -// serviceFirewallChainName takes the ServicePortName for a service and -// returns the associated iptables chain. This is computed by hashing (sha256) -// then encoding to base32 and truncating with the prefix "KUBE-FW-". +// serviceLocalChainName returns the name of the KUBE-SVL-XXXX chain for a service, which +// handles dispatching to local endpoints when using `Local` traffic policy. This chain only +// exists if the service has `Local` internal or external traffic policy. +func serviceLocalChainName(servicePortName string, protocol string) utiliptables.Chain { + return utiliptables.Chain(serviceLocalChainNamePrefix + portProtoHash(servicePortName, protocol)) +} + +// serviceFirewallChainName returns the name of the KUBE-FW-XXXX chain for a service, which +// is used to implement the filtering for the LoadBalancerSourceRanges feature. func serviceFirewallChainName(servicePortName string, protocol string) utiliptables.Chain { - return utiliptables.Chain("KUBE-FW-" + portProtoHash(servicePortName, protocol)) + return utiliptables.Chain(serviceFirewallChainNamePrefix + portProtoHash(servicePortName, protocol)) } -// serviceLBPortChainName takes the ServicePortName for a service and -// returns the associated iptables chain. This is computed by hashing (sha256) -// then encoding to base32 and truncating with the prefix "KUBE-XLB-". We do -// this because IPTables Chain Names must be <= 28 chars long, and the longer -// they are the harder they are to read. +// serviceLBChainName returns the name of the KUBE-XLB-XXXX chain for a service, which +// implements "short-circuiting" for internally-originated load balancer traffic when using +// `Local` external traffic policy. It forwards traffic from local sources to the KUBE-SVC-XXXX +// chain and traffic from external sources to the KUBE-SVL-XXXX chain. func serviceLBChainName(servicePortName string, protocol string) utiliptables.Chain { - return utiliptables.Chain("KUBE-XLB-" + portProtoHash(servicePortName, protocol)) + return utiliptables.Chain(serviceLBChainNamePrefix + portProtoHash(servicePortName, protocol)) } -// This is the same as servicePortChainName but with the endpoint included. +// servicePortEndpointChainName returns the name of the KUBE-SEP-XXXX chain for a particular +// service endpoint. func servicePortEndpointChainName(servicePortName string, protocol string, endpoint string) utiliptables.Chain { hash := sha256.Sum256([]byte(servicePortName + protocol + endpoint)) encoded := base32.StdEncoding.EncodeToString(hash[:]) - return utiliptables.Chain("KUBE-SEP-" + encoded[:16]) + return utiliptables.Chain(servicePortEndpointChainNamePrefix + encoded[:16]) +} + +func isServiceChainName(chainString string) bool { + prefixes := []string{ + servicePortChainNamePrefix, + serviceLocalChainNamePrefix, + servicePortEndpointChainNamePrefix, + serviceFirewallChainNamePrefix, + serviceLBChainNamePrefix, + } + + for _, p := range prefixes { + if strings.HasPrefix(chainString, p) { + return true + } + } + return false } // After a UDP or SCTP endpoint has been removed, we must flush any pending conntrack entries to it, or else we @@ -931,15 +962,6 @@ func (proxier *Proxier) syncProxyRules() { // Accumulate NAT chains to keep. activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set - // We are creating those slices ones here to avoid memory reallocations - // in every loop. Note that reuse the memory, instead of doing: - // slice = - // you should always do one of the below: - // slice = slice[:0] // and then append to it - // slice = append(slice[:0], ...) - readyEndpointChains := make([]utiliptables.Chain, 0) - localEndpointChains := make([]utiliptables.Chain, 0) - // To avoid growing this slice, we arbitrarily set its size to 64, // there is never more than that many arguments for a single line. // Note that even if we go over 64, it will still be correct - it @@ -980,36 +1002,14 @@ func (proxier *Proxier) syncProxyRules() { allEndpoints := proxier.endpointsMap[svcName] - // Filtering for topology aware endpoints. This function will only - // filter endpoints if appropriate feature gates are enabled and the - // Service does not have conflicting configuration such as - // externalTrafficPolicy=Local. - allEndpoints = proxy.FilterEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) - - // Scan the endpoints list to see what we have. "hasEndpoints" will be true - // if there are any usable endpoints for this service anywhere in the cluster. - var hasEndpoints, hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool - for _, ep := range allEndpoints { - if ep.IsReady() { - hasEndpoints = true - if ep.GetIsLocal() { - hasLocalReadyEndpoints = true - } - } else if svc.NodeLocalExternal() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) { - if ep.IsServing() && ep.IsTerminating() { - hasEndpoints = true - if ep.GetIsLocal() { - hasLocalServingTerminatingEndpoints = true - } - } - } - } - useTerminatingEndpoints := !hasLocalReadyEndpoints && hasLocalServingTerminatingEndpoints + // Figure out the endpoints for Cluster and Local traffic policy. + // allLocallyReachableEndpoints is the set of all endpoints that can be routed to + // from this node, given the service's traffic policies. hasEndpoints is true + // if the service has any usable endpoints on any node, not just this one. + clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) // Generate the per-endpoint chains. - readyEndpointChains = readyEndpointChains[:0] - localEndpointChains = localEndpointChains[:0] - for _, ep := range allEndpoints { + for _, ep := range allLocallyReachableEndpoints { epInfo, ok := ep.(*endpointsInfo) if !ok { klog.ErrorS(err, "Failed to cast endpointsInfo", "endpointsInfo", ep) @@ -1017,27 +1017,6 @@ func (proxier *Proxier) syncProxyRules() { } endpointChain := epInfo.ChainName - endpointInUse := false - - if epInfo.Ready { - readyEndpointChains = append(readyEndpointChains, endpointChain) - endpointInUse = true - } - if svc.NodeLocalExternal() && epInfo.IsLocal { - if useTerminatingEndpoints { - if epInfo.Serving && epInfo.Terminating { - localEndpointChains = append(localEndpointChains, endpointChain) - endpointInUse = true - } - } else if epInfo.Ready { - localEndpointChains = append(localEndpointChains, endpointChain) - endpointInUse = true - } - } - - if !endpointInUse { - continue - } // Create the endpoint chain, retaining counters if possible. if chain, ok := existingNATChains[endpointChain]; ok { @@ -1063,27 +1042,80 @@ func (proxier *Proxier) syncProxyRules() { proxier.natRules.Write(args) } - svcChain := svcInfo.servicePortChainName - if hasEndpoints { - // Create the per-service chain, retaining counters if possible. - if chain, ok := existingNATChains[svcChain]; ok { - proxier.natChains.WriteBytes(chain) - } else { - proxier.natChains.Write(utiliptables.MakeChainLine(svcChain)) - } - activeNATChains[svcChain] = true + policyClusterChain := svcInfo.servicePortChainName + policyLocalChain := svcInfo.serviceLocalChainName + svcXlbChain := svcInfo.serviceLBChainName + + internalTrafficChain := policyClusterChain + externalTrafficChain := policyClusterChain + + if svcInfo.NodeLocalInternal() { + internalTrafficChain = policyLocalChain + } + if svcInfo.NodeLocalExternal() { + externalTrafficChain = svcXlbChain } - svcXlbChain := svcInfo.serviceLBChainName - if hasEndpoints && svcInfo.NodeLocalExternal() { - // Only for services request OnlyLocal traffic - // create the per-service LB chain, retaining counters if possible. - if lbChain, ok := existingNATChains[svcXlbChain]; ok { - proxier.natChains.WriteBytes(lbChain) + if hasEndpoints && svcInfo.UsesClusterEndpoints() { + // Create the Cluster traffic policy chain, retaining counters if possible. + if chain, ok := existingNATChains[policyClusterChain]; ok { + proxier.natChains.WriteBytes(chain) + } else { + proxier.natChains.Write(utiliptables.MakeChainLine(policyClusterChain)) + } + activeNATChains[policyClusterChain] = true + } + + if hasEndpoints && svcInfo.ExternallyAccessible() && svcInfo.NodeLocalExternal() { + if chain, ok := existingNATChains[svcXlbChain]; ok { + proxier.natChains.WriteBytes(chain) } else { proxier.natChains.Write(utiliptables.MakeChainLine(svcXlbChain)) } activeNATChains[svcXlbChain] = true + + // The XLB chain redirects all pod -> external VIP + // traffic to the Service's ClusterIP instead. This happens + // whether or not we have local endpoints; only if localDetector + // is implemented + if proxier.localDetector.IsImplemented() { + proxier.natRules.Write( + "-A", string(svcXlbChain), + "-m", "comment", "--comment", + `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`, + proxier.localDetector.IfLocal(), + "-j", string(policyClusterChain)) + } + + // Next, redirect all src-type=LOCAL -> LB IP to the service chain + // for externalTrafficPolicy=Local This allows traffic originating + // from the host to be redirected to the service correctly, + // otherwise traffic to LB IPs are dropped if there are no local + // endpoints. + proxier.natRules.Write( + "-A", string(svcXlbChain), + "-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s LB IP"`, svcNameString), + "-m", "addrtype", "--src-type", "LOCAL", + "-j", string(KubeMarkMasqChain)) + proxier.natRules.Write( + "-A", string(svcXlbChain), + "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString), + "-m", "addrtype", "--src-type", "LOCAL", + "-j", string(policyClusterChain)) + + // Everything else goes to the SVL chain + proxier.natRules.Write( + "-A", string(svcXlbChain), + "-j", string(policyLocalChain)) + } + + if hasEndpoints && svcInfo.UsesLocalEndpoints() { + if chain, ok := existingNATChains[policyLocalChain]; ok { + proxier.natChains.WriteBytes(chain) + } else { + proxier.natChains.Write(utiliptables.MakeChainLine(policyLocalChain)) + } + activeNATChains[policyLocalChain] = true } // Capture the clusterIP. @@ -1096,7 +1128,7 @@ func (proxier *Proxier) syncProxyRules() { ) if proxier.masqueradeAll { proxier.natRules.Write( - "-A", string(svcChain), + "-A", string(internalTrafficChain), args, "-j", string(KubeMarkMasqChain)) } else if proxier.localDetector.IsImplemented() { @@ -1104,9 +1136,8 @@ func (proxier *Proxier) syncProxyRules() { // is that you can establish a static route for your Service range, // routing to any node, and that node will bridge into the Service // for you. Since that might bounce off-node, we masquerade here. - // If/when we support "Local" policy for VIPs, we should update this. proxier.natRules.Write( - "-A", string(svcChain), + "-A", string(internalTrafficChain), args, proxier.localDetector.IfNotLocal(), "-j", string(KubeMarkMasqChain)) @@ -1114,7 +1145,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.natRules.Write( "-A", string(kubeServicesChain), args, - "-j", string(svcChain)) + "-j", string(internalTrafficChain)) } else { // No endpoints. proxier.filterRules.Write( @@ -1137,14 +1168,12 @@ func (proxier *Proxier) syncProxyRules() { "--dport", strconv.Itoa(svcInfo.Port()), ) - destChain := svcXlbChain // We have to SNAT packets to external IPs if externalTrafficPolicy is cluster // and the traffic is NOT Local. Local traffic coming from Pods and Nodes will // be always forwarded to the corresponding Service, so no need to SNAT // If we can't differentiate the local traffic we always SNAT. if !svcInfo.NodeLocalExternal() { - appendTo := []string{"-A", string(svcChain)} - destChain = svcChain + appendTo := []string{"-A", string(policyClusterChain)} // This masquerades off-cluster traffic to a External IP. if proxier.localDetector.IsImplemented() { proxier.natRules.Write( @@ -1163,7 +1192,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.natRules.Write( "-A", string(kubeServicesChain), args, - "-j", string(destChain)) + "-j", string(externalTrafficChain)) } else { // No endpoints. @@ -1208,23 +1237,20 @@ func (proxier *Proxier) syncProxyRules() { "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), ) - // Each source match rule in the FW chain may jump to either the SVC or the XLB chain - chosenChain := svcXlbChain // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. if !svcInfo.NodeLocalExternal() { proxier.natRules.Write(args, "-j", string(KubeMarkMasqChain)) - chosenChain = svcChain } if len(svcInfo.LoadBalancerSourceRanges()) == 0 { // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain - proxier.natRules.Write(args, "-j", string(chosenChain)) + proxier.natRules.Write(args, "-j", string(externalTrafficChain)) } else { // firewall filter based on each source range allowFromNode := false for _, src := range svcInfo.LoadBalancerSourceRanges() { - proxier.natRules.Write(args, "-s", src, "-j", string(chosenChain)) + proxier.natRules.Write(args, "-s", src, "-j", string(externalTrafficChain)) _, cidr, err := netutils.ParseCIDRSloppy(src) if err != nil { klog.ErrorS(err, "Error parsing CIDR in LoadBalancerSourceRanges, dropping it", "cidr", cidr) @@ -1239,7 +1265,7 @@ func (proxier *Proxier) syncProxyRules() { proxier.natRules.Write( args, "-s", ingress, - "-j", string(chosenChain)) + "-j", string(externalTrafficChain)) } } @@ -1263,7 +1289,6 @@ func (proxier *Proxier) syncProxyRules() { // worthwhile to make a new per-service chain for nodeport rules, but // with just 2 rules it ends up being a waste and a cognitive burden. if svcInfo.NodePort() != 0 && len(nodeAddresses) != 0 { - if hasEndpoints { args = append(args[:0], "-m", "comment", "--comment", svcNameString, @@ -1273,14 +1298,9 @@ func (proxier *Proxier) syncProxyRules() { if !svcInfo.NodeLocalExternal() { // Nodeports need SNAT, unless they're local. proxier.natRules.Write( - "-A", string(svcChain), + "-A", string(policyClusterChain), args, "-j", string(KubeMarkMasqChain)) - // Jump to the service chain. - proxier.natRules.Write( - "-A", string(kubeNodePortsChain), - args, - "-j", string(svcChain)) } else { // TODO: Make all nodePorts jump to the firewall chain. // Currently we only create it for loadbalancers (#33586). @@ -1290,16 +1310,16 @@ func (proxier *Proxier) syncProxyRules() { if isIPv6 { loopback = "::1/128" } - appendTo := []string{"-A", string(kubeNodePortsChain)} proxier.natRules.Write( - appendTo, + "-A", string(kubeNodePortsChain), args, "-s", loopback, "-j", string(KubeMarkMasqChain)) - proxier.natRules.Write( - appendTo, - args, - "-j", string(svcXlbChain)) } + // Jump to the service chain. + proxier.natRules.Write( + "-A", string(kubeNodePortsChain), + args, + "-j", string(externalTrafficChain)) } else { // No endpoints. proxier.filterRules.Write( @@ -1326,57 +1346,26 @@ func (proxier *Proxier) syncProxyRules() { ) } - if !hasEndpoints { - continue + if svcInfo.UsesClusterEndpoints() { + // Write rules jumping from policyClusterChain to clusterEndpoints + proxier.writeServiceToEndpointRules(svcNameString, svcInfo, policyClusterChain, clusterEndpoints, args) } - // Write rules jumping from svcChain to readyEndpointChains - proxier.writeServiceToEndpointRules(svcNameString, svcInfo, svcChain, readyEndpointChains, args) - - // The logic below this applies only if this service is marked as OnlyLocal - if !svcInfo.NodeLocalExternal() { - continue - } - - // First rule in the chain redirects all pod -> external VIP traffic to the - // Service's ClusterIP instead. This happens whether or not we have local - // endpoints; only if localDetector is implemented - if proxier.localDetector.IsImplemented() { - proxier.natRules.Write( - "-A", string(svcXlbChain), - "-m", "comment", "--comment", - `"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`, - proxier.localDetector.IfLocal(), - "-j", string(svcChain)) - } - - // Next, redirect all src-type=LOCAL -> LB IP to the service chain for externalTrafficPolicy=Local - // This allows traffic originating from the host to be redirected to the service correctly, - // otherwise traffic to LB IPs are dropped if there are no local endpoints. - args = append(args[:0], "-A", string(svcXlbChain)) - proxier.natRules.Write( - args, - "-m", "comment", "--comment", fmt.Sprintf(`"masquerade LOCAL traffic for %s LB IP"`, svcNameString), - "-m", "addrtype", "--src-type", "LOCAL", "-j", string(KubeMarkMasqChain)) - proxier.natRules.Write( - args, - "-m", "comment", "--comment", fmt.Sprintf(`"route LOCAL traffic for %s LB IP to service chain"`, svcNameString), - "-m", "addrtype", "--src-type", "LOCAL", "-j", string(svcChain)) - - numLocalEndpoints := len(localEndpointChains) - if numLocalEndpoints == 0 { - // Blackhole all traffic since there are no local endpoints - args = append(args[:0], - "-A", string(svcXlbChain), - "-m", "comment", "--comment", - fmt.Sprintf(`"%s has no local endpoints"`, svcNameString), - "-j", - string(KubeMarkDropChain), - ) - proxier.natRules.Write(args) - } else { - // Write rules jumping from svcXlbChain to localEndpointChains - proxier.writeServiceToEndpointRules(svcNameString, svcInfo, svcXlbChain, localEndpointChains, args) + if svcInfo.UsesLocalEndpoints() { + if len(localEndpoints) != 0 { + // Write rules jumping from policyLocalChain to localEndpointChains + proxier.writeServiceToEndpointRules(svcNameString, svcInfo, policyLocalChain, localEndpoints, args) + } else if hasEndpoints { + // Blackhole all traffic since there are no local endpoints + args = append(args[:0], + "-A", string(policyLocalChain), + "-m", "comment", "--comment", + fmt.Sprintf(`"%s has no local endpoints"`, svcNameString), + "-j", + string(KubeMarkDropChain), + ) + proxier.natRules.Write(args) + } } } @@ -1384,7 +1373,7 @@ func (proxier *Proxier) syncProxyRules() { for chain := range existingNATChains { if !activeNATChains[chain] { chainString := string(chain) - if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") { + if !isServiceChainName(chainString) { // Ignore chains that aren't ours. continue } @@ -1535,27 +1524,35 @@ func (proxier *Proxier) syncProxyRules() { proxier.deleteEndpointConnections(endpointUpdateResult.StaleEndpoints) } -func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpointChains []utiliptables.Chain, args []string) { +func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInfo proxy.ServicePort, svcChain utiliptables.Chain, endpoints []proxy.Endpoint, args []string) { // First write session affinity rules, if applicable. if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { - for _, endpointChain := range endpointChains { + for _, ep := range endpoints { + epInfo, ok := ep.(*endpointsInfo) + if !ok { + continue + } args = append(args[:0], "-A", string(svcChain), ) args = proxier.appendServiceCommentLocked(args, svcNameString) args = append(args, - "-m", "recent", "--name", string(endpointChain), + "-m", "recent", "--name", string(epInfo.ChainName), "--rcheck", "--seconds", strconv.Itoa(svcInfo.StickyMaxAgeSeconds()), "--reap", - "-j", string(endpointChain), + "-j", string(epInfo.ChainName), ) proxier.natRules.Write(args) } } // Now write loadbalancing rules. - numEndpoints := len(endpointChains) - for i, endpointChain := range endpointChains { - // Balancing rules in the per-service chain. + numEndpoints := len(endpoints) + for i, ep := range endpoints { + epInfo, ok := ep.(*endpointsInfo) + if !ok { + continue + } + args = append(args[:0], "-A", string(svcChain)) args = proxier.appendServiceCommentLocked(args, svcNameString) if i < (numEndpoints - 1) { @@ -1566,6 +1563,6 @@ func (proxier *Proxier) writeServiceToEndpointRules(svcNameString string, svcInf "--probability", proxier.probability(numEndpoints-i)) } // The final (or only if n == 1) rule is a guaranteed match. - proxier.natRules.Write(args, "-j", string(endpointChain)) + proxier.natRules.Write(args, "-j", string(epInfo.ChainName)) } } diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index 607b73b867e..543abcd24f5 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -995,6 +995,7 @@ COMMIT :KUBE-SEP-SXIVWICOYRO3J4NJ - [0:0] :KUBE-SVC-GNZBNJ2PO5MGZ6GT - [0:0] :KUBE-XLB-GNZBNJ2PO5MGZ6GT - [0:0] +:KUBE-SVL-GNZBNJ2PO5MGZ6GT - [0:0] :KUBE-FW-GNZBNJ2PO5MGZ6GT - [0:0] :KUBE-SEP-RS4RBKLTHTF2IUXJ - [0:0] :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] @@ -1025,7 +1026,8 @@ COMMIT -A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-GNZBNJ2PO5MGZ6GT -A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "masquerade LOCAL traffic for ns2/svc2:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "route LOCAL traffic for ns2/svc2:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-GNZBNJ2PO5MGZ6GT --A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "ns2/svc2:p80 has no local endpoints" -j KUBE-MARK-DROP +-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -j KUBE-SVL-GNZBNJ2PO5MGZ6GT +-A KUBE-SVL-GNZBNJ2PO5MGZ6GT -m comment --comment "ns2/svc2:p80 has no local endpoints" -j KUBE-MARK-DROP -A KUBE-SERVICES -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-X27LE4BHSL4DOUIK -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment "ns3/svc3:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-NODEPORTS -m comment --comment ns3/svc3:p80 -m tcp -p tcp --dport 3002 -j KUBE-SVC-X27LE4BHSL4DOUIK @@ -1072,6 +1074,7 @@ COMMIT :KUBE-SVC-GNZBNJ2PO5MGZ6GT - [0:0] :KUBE-SVC-X27LE4BHSL4DOUIK - [0:0] :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] +:KUBE-SVL-GNZBNJ2PO5MGZ6GT - [0:0] :KUBE-XLB-GNZBNJ2PO5MGZ6GT - [0:0] -A KUBE-NODEPORTS -m comment --comment ns2/svc2:p80 -m tcp -p tcp --dport 3001 -s 127.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-NODEPORTS -m comment --comment ns2/svc2:p80 -m tcp -p tcp --dport 3001 -j KUBE-XLB-GNZBNJ2PO5MGZ6GT @@ -1111,10 +1114,11 @@ COMMIT -A KUBE-SVC-X27LE4BHSL4DOUIK -m comment --comment ns3/svc3:p80 -j KUBE-SEP-OYPFS5VJICHGATKP -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment "ns1/svc1:p80 cluster IP" -m tcp -p tcp -d 172.30.0.41 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-SVC-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-SXIVWICOYRO3J4NJ +-A KUBE-SVL-GNZBNJ2PO5MGZ6GT -m comment --comment "ns2/svc2:p80 has no local endpoints" -j KUBE-MARK-DROP -A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-GNZBNJ2PO5MGZ6GT -A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "masquerade LOCAL traffic for ns2/svc2:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "route LOCAL traffic for ns2/svc2:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-GNZBNJ2PO5MGZ6GT --A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "ns2/svc2:p80 has no local endpoints" -j KUBE-MARK-DROP +-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -j KUBE-SVL-GNZBNJ2PO5MGZ6GT COMMIT `, }, @@ -1514,6 +1518,7 @@ COMMIT :KUBE-SEP-SXIVWICOYRO3J4NJ - [0:0] :KUBE-SVC-GNZBNJ2PO5MGZ6GT - [0:0] :KUBE-XLB-GNZBNJ2PO5MGZ6GT - [0:0] +:KUBE-SVL-GNZBNJ2PO5MGZ6GT - [0:0] :KUBE-FW-GNZBNJ2PO5MGZ6GT - [0:0] :KUBE-SEP-RS4RBKLTHTF2IUXJ - [0:0] :KUBE-SVC-PAZTZYUUMV5KCDZL - [0:0] @@ -1547,7 +1552,8 @@ COMMIT -A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-GNZBNJ2PO5MGZ6GT -A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "masquerade LOCAL traffic for ns2/svc2:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "route LOCAL traffic for ns2/svc2:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-GNZBNJ2PO5MGZ6GT --A KUBE-XLB-GNZBNJ2PO5MGZ6GT -m comment --comment "ns2/svc2:p80 has no local endpoints" -j KUBE-MARK-DROP +-A KUBE-XLB-GNZBNJ2PO5MGZ6GT -j KUBE-SVL-GNZBNJ2PO5MGZ6GT +-A KUBE-SVL-GNZBNJ2PO5MGZ6GT -m comment --comment "ns2/svc2:p80 has no local endpoints" -j KUBE-MARK-DROP -A KUBE-SVC-PAZTZYUUMV5KCDZL -m comment --comment "ns2b/svc2b:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ -A KUBE-SERVICES -m comment --comment "ns2b/svc2b:p80 cluster IP" -m tcp -p tcp -d 172.30.0.43 --dport 80 -j KUBE-SVC-PAZTZYUUMV5KCDZL -A KUBE-SERVICES -m comment --comment "ns2b/svc2b:p80 loadbalancer IP" -m tcp -p tcp -d 5.6.7.8 --dport 80 -j KUBE-FW-PAZTZYUUMV5KCDZL @@ -2100,6 +2106,7 @@ COMMIT :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] +:KUBE-SVL-XPGD46QRK7WJZT7O - [0:0] :KUBE-XLB-XPGD46QRK7WJZT7O - [0:0] :KUBE-SEP-SXIVWICOYRO3J4NJ - [0:0] :KUBE-SEP-ZX7GRIZKSNUQ3LAJ - [0:0] @@ -2119,7 +2126,8 @@ COMMIT -A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "masquerade LOCAL traffic for ns1/svc1:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "route LOCAL traffic for ns1/svc1:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-XPGD46QRK7WJZT7O --A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ +-A KUBE-XLB-XPGD46QRK7WJZT7O -j KUBE-SVL-XPGD46QRK7WJZT7O +-A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT ` @@ -2412,6 +2420,7 @@ COMMIT :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] +:KUBE-SVL-XPGD46QRK7WJZT7O - [0:0] :KUBE-XLB-XPGD46QRK7WJZT7O - [0:0] :KUBE-FW-XPGD46QRK7WJZT7O - [0:0] :KUBE-SEP-SXIVWICOYRO3J4NJ - [0:0] @@ -2438,8 +2447,9 @@ COMMIT -A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "masquerade LOCAL traffic for ns1/svc1:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "route LOCAL traffic for ns1/svc1:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-XPGD46QRK7WJZT7O --A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -m recent --name KUBE-SEP-ZX7GRIZKSNUQ3LAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ --A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ +-A KUBE-XLB-XPGD46QRK7WJZT7O -j KUBE-SVL-XPGD46QRK7WJZT7O +-A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -m recent --name KUBE-SEP-ZX7GRIZKSNUQ3LAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ +-A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT ` @@ -2469,6 +2479,7 @@ COMMIT :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] +:KUBE-SVL-XPGD46QRK7WJZT7O - [0:0] :KUBE-XLB-XPGD46QRK7WJZT7O - [0:0] :KUBE-SEP-SXIVWICOYRO3J4NJ - [0:0] :KUBE-SEP-ZX7GRIZKSNUQ3LAJ - [0:0] @@ -2487,7 +2498,8 @@ COMMIT -A KUBE-SEP-ZX7GRIZKSNUQ3LAJ -m comment --comment ns1/svc1:p80 -m tcp -p tcp -j DNAT --to-destination 10.180.2.1:80 -A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "masquerade LOCAL traffic for ns1/svc1:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "route LOCAL traffic for ns1/svc1:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-XPGD46QRK7WJZT7O --A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ +-A KUBE-XLB-XPGD46QRK7WJZT7O -j KUBE-SVL-XPGD46QRK7WJZT7O +-A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -d 192.168.0.2 -j KUBE-NODEPORTS COMMIT ` @@ -2515,6 +2527,7 @@ COMMIT :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-SVC-XPGD46QRK7WJZT7O - [0:0] +:KUBE-SVL-XPGD46QRK7WJZT7O - [0:0] :KUBE-XLB-XPGD46QRK7WJZT7O - [0:0] :KUBE-SEP-SXIVWICOYRO3J4NJ - [0:0] :KUBE-SEP-ZX7GRIZKSNUQ3LAJ - [0:0] @@ -2535,7 +2548,8 @@ COMMIT -A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-XPGD46QRK7WJZT7O -A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "masquerade LOCAL traffic for ns1/svc1:p80 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment "route LOCAL traffic for ns1/svc1:p80 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-XPGD46QRK7WJZT7O --A KUBE-XLB-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ +-A KUBE-XLB-XPGD46QRK7WJZT7O -j KUBE-SVL-XPGD46QRK7WJZT7O +-A KUBE-SVL-XPGD46QRK7WJZT7O -m comment --comment ns1/svc1:p80 -j KUBE-SEP-ZX7GRIZKSNUQ3LAJ -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -d 192.168.0.2 -j KUBE-NODEPORTS COMMIT ` @@ -3890,6 +3904,7 @@ COMMIT :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0] :KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0] :KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0] :KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0] @@ -3914,7 +3929,8 @@ COMMIT -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-AQI2S6QIMU7PVVRP -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-XLB-AQI2S6QIMU7PVVRP -j KUBE-SVL-AQI2S6QIMU7PVVRP +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-3JOIVZTXZZRGORX4 -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT ` @@ -4416,15 +4432,15 @@ COMMIT :KUBE-NODEPORTS - [0:0] :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] -:KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0] :KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0] -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE -A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 --A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ --A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVC-AQI2S6QIMU7PVVRP --A KUBE-SVC-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVL-AQI2S6QIMU7PVVRP +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-3JOIVZTXZZRGORX4 -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -s 10.0.1.1 -j KUBE-MARK-MASQ -A KUBE-SEP-3JOIVZTXZZRGORX4 -m comment --comment ns1/svc1 -m tcp -p tcp -j DNAT --to-destination 10.0.1.1:80 -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS @@ -4447,7 +4463,6 @@ COMMIT :KUBE-EXTERNAL-SERVICES - [0:0] :KUBE-FORWARD - [0:0] :KUBE-NODEPORTS - [0:0] --A KUBE-SERVICES -m comment --comment "ns1/svc1 has no endpoints" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j REJECT -A KUBE-FORWARD -m conntrack --ctstate INVALID -j DROP -A KUBE-FORWARD -m comment --comment "kubernetes forwarding rules" -m mark --mark 0x4000/0x4000 -j ACCEPT -A KUBE-FORWARD -m comment --comment "kubernetes forwarding conntrack rule" -m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT @@ -4457,10 +4472,14 @@ COMMIT :KUBE-NODEPORTS - [0:0] :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] +:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0] -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN -A KUBE-POSTROUTING -j MARK --xor-mark 0x4000 -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -j MASQUERADE -A KUBE-MARK-MASQ -j MARK --or-mark 0x4000 +-A KUBE-SERVICES -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 -j KUBE-SVL-AQI2S6QIMU7PVVRP +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 cluster IP" -m tcp -p tcp -d 172.30.1.1 --dport 80 ! -s 10.0.0.0/8 -j KUBE-MARK-MASQ +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT `, @@ -4666,6 +4685,7 @@ COMMIT :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0] :KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0] :KUBE-FW-AQI2S6QIMU7PVVRP - [0:0] :KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0] @@ -4695,10 +4715,11 @@ COMMIT -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-AQI2S6QIMU7PVVRP -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --rcheck --seconds 10800 --reap -j KUBE-SEP-3JOIVZTXZZRGORX4 --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-3JOIVZTXZZRGORX4 --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-XLB-AQI2S6QIMU7PVVRP -j KUBE-SVL-AQI2S6QIMU7PVVRP +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --rcheck --seconds 10800 --reap -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-IO5XOSKPAXIFQXAJ -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT `, @@ -4786,6 +4807,7 @@ COMMIT :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0] :KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0] :KUBE-FW-AQI2S6QIMU7PVVRP - [0:0] :KUBE-SEP-3JOIVZTXZZRGORX4 - [0:0] @@ -4815,10 +4837,11 @@ COMMIT -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-AQI2S6QIMU7PVVRP -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --rcheck --seconds 10800 --reap -j KUBE-SEP-3JOIVZTXZZRGORX4 --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-3JOIVZTXZZRGORX4 --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-XLB-AQI2S6QIMU7PVVRP -j KUBE-SVL-AQI2S6QIMU7PVVRP +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-3JOIVZTXZZRGORX4 --rcheck --seconds 10800 --reap -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-3JOIVZTXZZRGORX4 +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-IO5XOSKPAXIFQXAJ -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT `, @@ -4898,6 +4921,7 @@ COMMIT :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0] :KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0] :KUBE-FW-AQI2S6QIMU7PVVRP - [0:0] :KUBE-SEP-IO5XOSKPAXIFQXAJ - [0:0] @@ -4923,10 +4947,11 @@ COMMIT -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-AQI2S6QIMU7PVVRP -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-XGJFVO3L2O5SRFNT --rcheck --seconds 10800 --reap -j KUBE-SEP-XGJFVO3L2O5SRFNT --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT +-A KUBE-XLB-AQI2S6QIMU7PVVRP -j KUBE-SVL-AQI2S6QIMU7PVVRP +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-IO5XOSKPAXIFQXAJ --rcheck --seconds 10800 --reap -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m recent --name KUBE-SEP-XGJFVO3L2O5SRFNT --rcheck --seconds 10800 --reap -j KUBE-SEP-XGJFVO3L2O5SRFNT +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -m statistic --mode random --probability 0.5000000000 -j KUBE-SEP-IO5XOSKPAXIFQXAJ +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment ns1/svc1 -j KUBE-SEP-XGJFVO3L2O5SRFNT -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT `, @@ -5011,6 +5036,7 @@ COMMIT :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0] :KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0] :KUBE-FW-AQI2S6QIMU7PVVRP - [0:0] :KUBE-SEP-EQCHZ7S2PJ72OHAY - [0:0] @@ -5030,7 +5056,8 @@ COMMIT -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-AQI2S6QIMU7PVVRP -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP +-A KUBE-XLB-AQI2S6QIMU7PVVRP -j KUBE-SVL-AQI2S6QIMU7PVVRP +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT `, @@ -5081,6 +5108,7 @@ COMMIT :KUBE-POSTROUTING - [0:0] :KUBE-MARK-MASQ - [0:0] :KUBE-SVC-AQI2S6QIMU7PVVRP - [0:0] +:KUBE-SVL-AQI2S6QIMU7PVVRP - [0:0] :KUBE-XLB-AQI2S6QIMU7PVVRP - [0:0] :KUBE-FW-AQI2S6QIMU7PVVRP - [0:0] -A KUBE-POSTROUTING -m mark ! --mark 0x4000/0x4000 -j RETURN @@ -5095,7 +5123,8 @@ COMMIT -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "Redirect pods trying to reach external loadbalancer VIP to clusterIP" -s 10.0.0.0/8 -j KUBE-SVC-AQI2S6QIMU7PVVRP -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "masquerade LOCAL traffic for ns1/svc1 LB IP" -m addrtype --src-type LOCAL -j KUBE-MARK-MASQ -A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "route LOCAL traffic for ns1/svc1 LB IP to service chain" -m addrtype --src-type LOCAL -j KUBE-SVC-AQI2S6QIMU7PVVRP --A KUBE-XLB-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP +-A KUBE-XLB-AQI2S6QIMU7PVVRP -j KUBE-SVL-AQI2S6QIMU7PVVRP +-A KUBE-SVL-AQI2S6QIMU7PVVRP -m comment --comment "ns1/svc1 has no local endpoints" -j KUBE-MARK-DROP -A KUBE-SERVICES -m comment --comment "kubernetes service nodeports; NOTE: this must be the last rule in this chain" -m addrtype --dst-type LOCAL -j KUBE-NODEPORTS COMMIT `, diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index e424fe6e499..a6aeac3c0b2 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1951,14 +1951,6 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode // curEndpoints represents IPVS destinations listed from current system. curEndpoints := sets.NewString() - // readyEndpoints represents Endpoints watched from API Server. - readyEndpoints := sets.NewString() - // localReadyEndpoints represents local endpoints that are ready and NOT terminating. - localReadyEndpoints := sets.NewString() - // localReadyTerminatingEndpoints represents local endpoints that are ready AND terminating. - // Fall back to these endpoints if no non-terminating ready endpoints exist for node-local traffic. - localReadyTerminatingEndpoints := sets.NewString() - curDests, err := proxier.ipvs.GetRealServers(appliedVirtualServer) if err != nil { klog.ErrorS(err, "Failed to list IPVS destinations") @@ -1978,32 +1970,17 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode if !ok { klog.InfoS("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName) } else { - endpoints = proxy.FilterEndpoints(endpoints, svcInfo, proxier.nodeLabels) + clusterEndpoints, localEndpoints, _, _ := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels) + if onlyNodeLocalEndpoints { + endpoints = localEndpoints + } else { + endpoints = clusterEndpoints + } } + newEndpoints := sets.NewString() for _, epInfo := range endpoints { - if epInfo.IsReady() { - readyEndpoints.Insert(epInfo.String()) - } - - if onlyNodeLocalEndpoints && epInfo.GetIsLocal() { - if epInfo.IsReady() { - localReadyEndpoints.Insert(epInfo.String()) - } else if epInfo.IsServing() && epInfo.IsTerminating() { - localReadyTerminatingEndpoints.Insert(epInfo.String()) - } - } - } - - newEndpoints := readyEndpoints - if onlyNodeLocalEndpoints { - newEndpoints = localReadyEndpoints - - if utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) { - if len(newEndpoints) == 0 && localReadyTerminatingEndpoints.Len() > 0 { - newEndpoints = localReadyTerminatingEndpoints - } - } + newEndpoints.Insert(epInfo.String()) } // Create new endpoints diff --git a/pkg/proxy/service.go b/pkg/proxy/service.go index 1d149adf149..71b141a3376 100644 --- a/pkg/proxy/service.go +++ b/pkg/proxy/service.go @@ -139,6 +139,24 @@ func (info *BaseServiceInfo) HintsAnnotation() string { return info.hintsAnnotation } +// ExternallyAccessible is part of ServicePort interface. +func (info *BaseServiceInfo) ExternallyAccessible() bool { + return info.nodePort != 0 || len(info.loadBalancerStatus.Ingress) != 0 || len(info.externalIPs) != 0 +} + +// UsesClusterEndpoints is part of ServicePort interface. +func (info *BaseServiceInfo) UsesClusterEndpoints() bool { + // The service port uses Cluster endpoints if the internal traffic policy is "Cluster", + // or if it accepts external traffic at all. (Even if the external traffic policy is + // "Local", we need Cluster endpoints to implement short circuiting.) + return !info.nodeLocalInternal || info.ExternallyAccessible() +} + +// UsesLocalEndpoints is part of ServicePort interface. +func (info *BaseServiceInfo) UsesLocalEndpoints() bool { + return info.nodeLocalInternal || (info.nodeLocalExternal && info.ExternallyAccessible()) +} + func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo { nodeLocalExternal := false if apiservice.RequestsOnlyLocalTraffic(service) { diff --git a/pkg/proxy/topology.go b/pkg/proxy/topology.go index ae7274c5b25..ef16d559b1c 100644 --- a/pkg/proxy/topology.go +++ b/pkg/proxy/topology.go @@ -23,78 +23,165 @@ import ( "k8s.io/kubernetes/pkg/features" ) -// FilterEndpoints filters endpoints based on Service configuration, node -// labels, and enabled feature gates. This is primarily used to enable topology -// aware routing. -func FilterEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) []Endpoint { - if svcInfo.NodeLocalExternal() { - return endpoints +// CategorizeEndpoints returns: +// +// - The service's usable Cluster-traffic-policy endpoints (taking topology into account, if +// relevant). This will be nil if the service does not ever use Cluster traffic policy. +// +// - The service's usable Local-traffic-policy endpoints (including terminating endpoints, if +// relevant). This will be nil if the service does not ever use Local traffic policy. +// +// - The combined list of all endpoints reachable from this node (which is the union of the +// previous two lists, but in the case where it is identical to one or the other, we avoid +// allocating a separate list). +// +// - An indication of whether the service has any endpoints reachable from anywhere in the +// cluster. (This may be true even if allReachableEndpoints is empty.) +func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) { + var useTopology, useServingTerminatingEndpoints bool + + if svcInfo.UsesClusterEndpoints() { + useTopology = canUseTopology(endpoints, svcInfo, nodeLabels) + clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool { + if !ep.IsReady() { + return false + } + if useTopology && !availableForTopology(ep, nodeLabels) { + return false + } + return true + }) + + // If there are any Ready endpoints anywhere in the cluster, we are + // guaranteed to get one in clusterEndpoints. + if len(clusterEndpoints) > 0 { + hasAnyEndpoints = true + } } - if utilfeature.DefaultFeatureGate.Enabled(features.ServiceInternalTrafficPolicy) && svcInfo.NodeLocalInternal() { - return FilterLocalEndpoint(endpoints) + if !svcInfo.UsesLocalEndpoints() { + allReachableEndpoints = clusterEndpoints + return } - if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { - return filterEndpointsWithHints(endpoints, svcInfo.HintsAnnotation(), nodeLabels) + // Pre-scan the endpoints, to figure out which type of endpoint Local + // traffic policy will use, and also to see if there are any usable + // endpoints anywhere in the cluster. + var hasLocalReadyEndpoints, hasLocalServingTerminatingEndpoints bool + for _, ep := range endpoints { + if ep.IsReady() { + hasAnyEndpoints = true + if ep.GetIsLocal() { + hasLocalReadyEndpoints = true + } + } else if ep.IsServing() && ep.IsTerminating() && utilfeature.DefaultFeatureGate.Enabled(features.ProxyTerminatingEndpoints) { + hasAnyEndpoints = true + if ep.GetIsLocal() { + hasLocalServingTerminatingEndpoints = true + } + } } - return endpoints + if hasLocalReadyEndpoints { + localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool { + return ep.GetIsLocal() && ep.IsReady() + }) + } else if hasLocalServingTerminatingEndpoints { + useServingTerminatingEndpoints = true + localEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool { + return ep.GetIsLocal() && ep.IsServing() && ep.IsTerminating() + }) + } + + if !svcInfo.UsesClusterEndpoints() { + allReachableEndpoints = localEndpoints + return + } + + if !useTopology && !useServingTerminatingEndpoints { + // !useServingTerminatingEndpoints means that localEndpoints contains only + // Ready endpoints. !useTopology means that clusterEndpoints contains *every* + // Ready endpoint. So clusterEndpoints must be a superset of localEndpoints. + allReachableEndpoints = clusterEndpoints + return + } + + // clusterEndpoints may contain remote endpoints that aren't in localEndpoints, while + // localEndpoints may contain terminating or topologically-unavailable local endpoints + // that aren't in clusterEndpoints. So we have to merge the two lists. + endpointsMap := make(map[string]Endpoint, len(clusterEndpoints)+len(localEndpoints)) + for _, ep := range clusterEndpoints { + endpointsMap[ep.String()] = ep + } + for _, ep := range localEndpoints { + endpointsMap[ep.String()] = ep + } + allReachableEndpoints = make([]Endpoint, 0, len(endpointsMap)) + for _, ep := range endpointsMap { + allReachableEndpoints = append(allReachableEndpoints, ep) + } + + return } -// filterEndpointsWithHints provides filtering based on the hints included in -// EndpointSlices. If any of the following are true, the full list of endpoints -// will be returned without any filtering: -// * The AnnotationTopologyAwareHints annotation is not set to "Auto" for this -// Service. -// * No zone is specified in node labels. -// * No endpoints for this Service have a hint pointing to the zone this -// instance of kube-proxy is running in. -// * One or more endpoints for this Service do not have hints specified. -func filterEndpointsWithHints(endpoints []Endpoint, hintsAnnotation string, nodeLabels map[string]string) []Endpoint { +// canUseTopology returns true if topology aware routing is enabled and properly configured +// in this cluster. That is, it checks that: +// * The TopologyAwareHints feature is enabled +// * The "service.kubernetes.io/topology-aware-hints" annotation on this Service is set to "Auto" +// * The node's labels include "topology.kubernetes.io/zone" +// * All of the endpoints for this Service have a topology hint +// * At least one endpoint for this Service is hinted for this node's zone. +func canUseTopology(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) bool { + hintsAnnotation := svcInfo.HintsAnnotation() if hintsAnnotation != "Auto" && hintsAnnotation != "auto" { if hintsAnnotation != "" && hintsAnnotation != "Disabled" && hintsAnnotation != "disabled" { klog.InfoS("Skipping topology aware endpoint filtering since Service has unexpected value", "annotationTopologyAwareHints", v1.AnnotationTopologyAwareHints, "hints", hintsAnnotation) } - return endpoints + return false } zone, ok := nodeLabels[v1.LabelTopologyZone] if !ok || zone == "" { klog.InfoS("Skipping topology aware endpoint filtering since node is missing label", "label", v1.LabelTopologyZone) - return endpoints + return false } - filteredEndpoints := []Endpoint{} - + hasEndpointForZone := false for _, endpoint := range endpoints { if !endpoint.IsReady() { continue } if endpoint.GetZoneHints().Len() == 0 { klog.InfoS("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint") - return endpoints + return false } + if endpoint.GetZoneHints().Has(zone) { - filteredEndpoints = append(filteredEndpoints, endpoint) + hasEndpointForZone = true } } - if len(filteredEndpoints) == 0 { + if !hasEndpointForZone { klog.InfoS("Skipping topology aware endpoint filtering since no hints were provided for zone", "zone", zone) - return endpoints + return false } - return filteredEndpoints + return true } -// FilterLocalEndpoint returns the node local endpoints -func FilterLocalEndpoint(endpoints []Endpoint) []Endpoint { - var filteredEndpoints []Endpoint +// availableForTopology checks if this endpoint is available for use on this node, given +// topology constraints. (It assumes that canUseTopology() returned true.) +func availableForTopology(endpoint Endpoint, nodeLabels map[string]string) bool { + zone := nodeLabels[v1.LabelTopologyZone] + return endpoint.GetZoneHints().Has(zone) +} + +// filterEndpoints filters endpoints according to predicate +func filterEndpoints(endpoints []Endpoint, predicate func(Endpoint) bool) []Endpoint { + filteredEndpoints := make([]Endpoint, 0, len(endpoints)) - // Get all the local endpoints for _, ep := range endpoints { - if ep.GetIsLocal() { + if predicate(ep) { filteredEndpoints = append(filteredEndpoints, ep) } } diff --git a/pkg/proxy/topology_test.go b/pkg/proxy/topology_test.go index a151fc2b182..35c110815a2 100644 --- a/pkg/proxy/topology_test.go +++ b/pkg/proxy/topology_test.go @@ -45,74 +45,105 @@ func checkExpectedEndpoints(expected sets.String, actual []Endpoint) error { return kerrors.NewAggregate(errs) } -func TestFilterEndpoints(t *testing.T) { +func TestCategorizeEndpoints(t *testing.T) { testCases := []struct { - name string - hintsEnabled bool - nodeLabels map[string]string - serviceInfo ServicePort - endpoints []Endpoint - expectedEndpoints sets.String + name string + hintsEnabled bool + pteEnabled bool + nodeLabels map[string]string + serviceInfo ServicePort + endpoints []Endpoint + + // We distinguish `nil` ("service doesn't use this kind of endpoints") from + // `sets.String()` ("service uses this kind of endpoints but has no endpoints"). + // allEndpoints can be left unset if only one of clusterEndpoints and + // localEndpoints is set, and allEndpoints is identical to it. + // onlyRemoteEndpoints should be true if CategorizeEndpoints returns true for + // hasAnyEndpoints despite allEndpoints being empty. + clusterEndpoints sets.String + localEndpoints sets.String + allEndpoints sets.String + onlyRemoteEndpoints bool }{{ name: "hints enabled, hints annotation == auto", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "auto"}, + serviceInfo: &BaseServiceInfo{hintsAnnotation: "auto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "hints, hints annotation == disabled, hints ignored", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "disabled"}, + serviceInfo: &BaseServiceInfo{hintsAnnotation: "disabled"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "hints, hints annotation == aUto (wrong capitalization), hints ignored", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{nodeLocalExternal: false, hintsAnnotation: "aUto"}, + serviceInfo: &BaseServiceInfo{hintsAnnotation: "aUto"}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "hints, hints annotation empty, hints ignored", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{nodeLocalExternal: false}, + serviceInfo: &BaseServiceInfo{}, endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, }, { - name: "node local endpoints, hints are ignored", + name: "externalTrafficPolicy: Local, topology ignored for Local endpoints", hintsEnabled: true, nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, - serviceInfo: &BaseServiceInfo{nodeLocalExternal: true, hintsAnnotation: "auto"}, + serviceInfo: &BaseServiceInfo{nodeLocalExternal: true, nodePort: 8080, hintsAnnotation: "auto"}, endpoints: []Endpoint{ - &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, - &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true}, + &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + localEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80"), + allEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"), + }, { + name: "internalTrafficPolicy: Local, topology ignored for Local endpoints", + hintsEnabled: true, + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, hintsAnnotation: "auto", nodeLocalExternal: false, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.1.2.4:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, + &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, + }, + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + localEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80"), + allEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"), }, { name: "empty node labels", hintsEnabled: true, @@ -121,7 +152,8 @@ func TestFilterEndpoints(t *testing.T) { endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80"), + localEndpoints: nil, }, { name: "empty zone label", hintsEnabled: true, @@ -130,7 +162,8 @@ func TestFilterEndpoints(t *testing.T) { endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80"), + localEndpoints: nil, }, { name: "node in different zone, no endpoint filtering", hintsEnabled: true, @@ -139,7 +172,8 @@ func TestFilterEndpoints(t *testing.T) { endpoints: []Endpoint{ &BaseEndpointInfo{Endpoint: "10.1.2.3:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80"), + localEndpoints: nil, }, { name: "normal endpoint filtering, auto annotation", hintsEnabled: true, @@ -151,7 +185,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "unready endpoint", hintsEnabled: true, @@ -163,7 +198,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: false}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80"), + localEndpoints: nil, }, { name: "only unready endpoints in same zone (should not filter)", hintsEnabled: true, @@ -175,7 +211,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: false}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.4:80", "10.1.2.5:80"), + localEndpoints: nil, }, { name: "normal endpoint filtering, Auto annotation", hintsEnabled: true, @@ -187,7 +224,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "hintsAnnotation empty, no filtering applied", hintsEnabled: true, @@ -199,7 +237,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "hintsAnnotation disabled, no filtering applied", hintsEnabled: true, @@ -211,7 +250,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "missing hints, no filtering applied", hintsEnabled: true, @@ -223,7 +263,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: nil, Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-a"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "multiple hints per endpoint, filtering includes any endpoint with zone included", hintsEnabled: true, @@ -235,12 +276,28 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.1.2.5:80", ZoneHints: sets.NewString("zone-b", "zone-d"), Ready: true}, &BaseEndpointInfo{Endpoint: "10.1.2.6:80", ZoneHints: sets.NewString("zone-c"), Ready: true}, }, - expectedEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"), + clusterEndpoints: sets.NewString("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"), + localEndpoints: nil, }, { - name: "internalTrafficPolicy: Local, with empty endpoints", - serviceInfo: &BaseServiceInfo{nodeLocalInternal: true}, - endpoints: []Endpoint{}, - expectedEndpoints: nil, + name: "conflicting topology and localness require merging allEndpoints", + hintsEnabled: true, + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080, hintsAnnotation: "auto"}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.2:80", ZoneHints: sets.NewString("zone-a"), Ready: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.3:80", ZoneHints: sets.NewString("zone-b"), Ready: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.2:80"), + localEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80", "10.0.0.2:80"), + }, { + name: "internalTrafficPolicy: Local, with empty endpoints", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true}, + endpoints: []Endpoint{}, + clusterEndpoints: nil, + localEndpoints: sets.NewString(), }, { name: "internalTrafficPolicy: Local, but all endpoints are remote", serviceInfo: &BaseServiceInfo{nodeLocalInternal: true}, @@ -248,7 +305,9 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, }, - expectedEndpoints: nil, + clusterEndpoints: nil, + localEndpoints: sets.NewString(), + onlyRemoteEndpoints: true, }, { name: "internalTrafficPolicy: Local, all endpoints are local", serviceInfo: &BaseServiceInfo{nodeLocalInternal: true}, @@ -256,7 +315,8 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true}, }, - expectedEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + clusterEndpoints: nil, + localEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), }, { name: "internalTrafficPolicy: Local, some endpoints are local", serviceInfo: &BaseServiceInfo{nodeLocalInternal: true}, @@ -264,17 +324,186 @@ func TestFilterEndpoints(t *testing.T) { &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, }, - expectedEndpoints: sets.NewString("10.0.0.0:80"), + clusterEndpoints: nil, + localEndpoints: sets.NewString("10.0.0.0:80"), + }, { + name: "Cluster traffic policy, endpoints not Ready", + serviceInfo: &BaseServiceInfo{}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false}, + }, + clusterEndpoints: sets.NewString(), + localEndpoints: nil, + }, { + name: "Cluster traffic policy, some endpoints are Ready", + serviceInfo: &BaseServiceInfo{}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true}, + }, + clusterEndpoints: sets.NewString("10.0.0.1:80"), + localEndpoints: nil, + }, { + name: "Cluster traffic policy, PTE enabled, all endpoints are terminating", + pteEnabled: true, + serviceInfo: &BaseServiceInfo{}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: true}, + }, + clusterEndpoints: sets.NewString(), + localEndpoints: nil, + }, { + name: "iTP: Local, eTP: Cluster, some endpoints local", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: false, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + localEndpoints: sets.NewString("10.0.0.0:80"), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + }, { + name: "iTP: Cluster, eTP: Local, some endpoints local", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + localEndpoints: sets.NewString("10.0.0.0:80"), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + }, { + name: "iTP: Local, eTP: Local, some endpoints local", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + localEndpoints: sets.NewString("10.0.0.0:80"), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + }, { + name: "iTP: Local, eTP: Local, all endpoints remote", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + localEndpoints: sets.NewString(), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + }, { + name: "iTP: Local, eTP: Local, PTE disabled, all endpoints remote and terminating", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString(), + localEndpoints: sets.NewString(), + allEndpoints: sets.NewString(), + }, { + name: "iTP: Local, eTP: Local, PTE enabled, all endpoints remote and terminating", + pteEnabled: true, + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString(), + localEndpoints: sets.NewString(), + allEndpoints: sets.NewString(), + onlyRemoteEndpoints: true, + }, { + name: "iTP: Cluster, eTP: Local, PTE disabled, with terminating endpoints", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: false, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.2:80", Ready: false, Serving: true, Terminating: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.3:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80"), + localEndpoints: sets.NewString(), + allEndpoints: sets.NewString("10.0.0.0:80"), + }, { + name: "iTP: Cluster, eTP: Local, PTE enabled, with terminating endpoints", + pteEnabled: true, + serviceInfo: &BaseServiceInfo{nodeLocalInternal: false, nodeLocalExternal: true, nodePort: 8080}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: false, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.2:80", Ready: false, Serving: true, Terminating: true, IsLocal: true}, + &BaseEndpointInfo{Endpoint: "10.0.0.3:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80"), + localEndpoints: sets.NewString("10.0.0.2:80"), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.2:80"), + }, { + name: "externalTrafficPolicy ignored if not externally accessible", + serviceInfo: &BaseServiceInfo{nodeLocalExternal: true}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true}, + }, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + localEndpoints: nil, + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), + }, { + name: "no cluster endpoints for iTP:Local internal-only service", + serviceInfo: &BaseServiceInfo{nodeLocalInternal: true}, + endpoints: []Endpoint{ + &BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, + &BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: true}, + }, + clusterEndpoints: nil, + localEndpoints: sets.NewString("10.0.0.1:80"), + allEndpoints: sets.NewString("10.0.0.1:80"), }} for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TopologyAwareHints, tc.hintsEnabled)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ProxyTerminatingEndpoints, tc.pteEnabled)() - filteredEndpoints := FilterEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeLabels) - err := checkExpectedEndpoints(tc.expectedEndpoints, filteredEndpoints) + clusterEndpoints, localEndpoints, allEndpoints, hasAnyEndpoints := CategorizeEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeLabels) + + if tc.clusterEndpoints == nil && clusterEndpoints != nil { + t.Errorf("expected no cluster endpoints but got %v", clusterEndpoints) + } else { + err := checkExpectedEndpoints(tc.clusterEndpoints, clusterEndpoints) + if err != nil { + t.Errorf("error with cluster endpoints: %v", err) + } + } + + if tc.localEndpoints == nil && localEndpoints != nil { + t.Errorf("expected no local endpoints but got %v", localEndpoints) + } else { + err := checkExpectedEndpoints(tc.localEndpoints, localEndpoints) + if err != nil { + t.Errorf("error with local endpoints: %v", err) + } + } + + var expectedAllEndpoints sets.String + if tc.clusterEndpoints != nil && tc.localEndpoints == nil { + expectedAllEndpoints = tc.clusterEndpoints + } else if tc.localEndpoints != nil && tc.clusterEndpoints == nil { + expectedAllEndpoints = tc.localEndpoints + } else { + expectedAllEndpoints = tc.allEndpoints + } + err := checkExpectedEndpoints(expectedAllEndpoints, allEndpoints) if err != nil { - t.Errorf(err.Error()) + t.Errorf("error with allEndpoints: %v", err) + } + + expectedHasAnyEndpoints := len(expectedAllEndpoints) > 0 || tc.onlyRemoteEndpoints + if expectedHasAnyEndpoints != hasAnyEndpoints { + t.Errorf("expected hasAnyEndpoints=%v, got %v", expectedHasAnyEndpoints, hasAnyEndpoints) } }) } diff --git a/pkg/proxy/types.go b/pkg/proxy/types.go index b7b6613f061..a68d36109a8 100644 --- a/pkg/proxy/types.go +++ b/pkg/proxy/types.go @@ -91,6 +91,15 @@ type ServicePort interface { InternalTrafficPolicy() *v1.ServiceInternalTrafficPolicyType // HintsAnnotation returns the value of the v1.AnnotationTopologyAwareHints annotation. HintsAnnotation() string + // ExternallyAccessible returns true if the service port is reachable via something + // other than ClusterIP (NodePort/ExternalIP/LoadBalancer) + ExternallyAccessible() bool + // UsesClusterEndpoints returns true if the service port ever sends traffic to + // endpoints based on "Cluster" traffic policy + UsesClusterEndpoints() bool + // UsesLocalEndpoints returns true if the service port ever sends traffic to + // endpoints based on "Local" traffic policy + UsesLocalEndpoints() bool } // Endpoint in an interface which abstracts information about an endpoint.