diff --git a/pkg/proxy/endpoint.go b/pkg/proxy/endpoint.go index 474edf36a91..f7323881abe 100644 --- a/pkg/proxy/endpoint.go +++ b/pkg/proxy/endpoint.go @@ -46,8 +46,11 @@ type Endpoint interface { IsTerminating() bool // ZoneHints returns the zone hint for the endpoint. This is based on - // endpoint.hints.forZones[0].name in the EndpointSlice API. + // endpoint.hints.forZones[*].name in the EndpointSlice API. ZoneHints() sets.Set[string] + // NodeHints returns the node hint for the endpoint. This is based on + // endpoint.hints.forNodes[*].name in the EndpointSlice API. + NodeHints() sets.Set[string] } // BaseEndpointInfo contains base information that defines an endpoint. @@ -78,6 +81,9 @@ type BaseEndpointInfo struct { // zoneHints represent the zone hints for the endpoint. This is based on // endpoint.hints.forZones[*].name in the EndpointSlice API. zoneHints sets.Set[string] + // nodeHints represent the node hints for the endpoint. This is based on + // endpoint.hints.forNodes[*].name in the EndpointSlice API. + nodeHints sets.Set[string] } var _ Endpoint = &BaseEndpointInfo{} @@ -119,12 +125,17 @@ func (info *BaseEndpointInfo) IsTerminating() bool { return info.terminating } -// ZoneHints returns the zone hint for the endpoint. +// ZoneHints returns the zone hints for the endpoint. func (info *BaseEndpointInfo) ZoneHints() sets.Set[string] { return info.zoneHints } -func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints sets.Set[string]) *BaseEndpointInfo { +// NodeHints returns the node hints for the endpoint. +func (info *BaseEndpointInfo) NodeHints() sets.Set[string] { + return info.nodeHints +} + +func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints, nodeHints sets.Set[string]) *BaseEndpointInfo { return &BaseEndpointInfo{ ip: ip, port: port, @@ -134,5 +145,6 @@ func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminati serving: serving, terminating: terminating, zoneHints: zoneHints, + nodeHints: nodeHints, } } diff --git a/pkg/proxy/endpointslicecache.go b/pkg/proxy/endpointslicecache.go index c194afd52f5..1d1eea4f40a 100644 --- a/pkg/proxy/endpointslicecache.go +++ b/pkg/proxy/endpointslicecache.go @@ -25,7 +25,9 @@ import ( discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" utilnet "k8s.io/utils/net" ) @@ -210,17 +212,25 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port serving := endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving terminating := endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating - var zoneHints sets.Set[string] - if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 { - zoneHints = sets.New[string]() - for _, zone := range endpoint.Hints.ForZones { - zoneHints.Insert(zone.Name) + var zoneHints, nodeHints sets.Set[string] + if endpoint.Hints != nil { + if len(endpoint.Hints.ForZones) > 0 { + zoneHints = sets.New[string]() + for _, zone := range endpoint.Hints.ForZones { + zoneHints.Insert(zone.Name) + } + } + if len(endpoint.Hints.ForNodes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.PreferSameTrafficDistribution) { + nodeHints = sets.New[string]() + for _, node := range endpoint.Hints.ForNodes { + nodeHints.Insert(node.Name) + } } } endpointIP := utilnet.ParseIPSloppy(endpoint.Addresses[0]).String() endpointInfo := newBaseEndpointInfo(endpointIP, portNum, isLocal, - ready, serving, terminating, zoneHints) + ready, serving, terminating, zoneHints, nodeHints) // This logic ensures we're deduplicating potential overlapping endpoints // isLocal should not vary between matching endpoints, but if it does, we diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index cc1394baef8..d50e6c06bd2 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -987,7 +987,7 @@ func (proxier *Proxier) syncProxyRules() { // from this node, given the service's traffic policies. hasEndpoints is true // if the service has any usable endpoints on any node, not just this one. allEndpoints := proxier.endpointsMap[svcName] - clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) + clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.nodeLabels) // clusterPolicyChain contains the endpoints used with "Cluster" traffic policy clusterPolicyChain := svcInfo.clusterPolicyChainName diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 85bd8950eaf..51a6cf3faa5 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1843,7 +1843,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode if !ok { proxier.logger.Info("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName) } else { - clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels) + clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeName, proxier.nodeLabels) if onlyNodeLocalEndpoints { if len(localEndpoints) > 0 { endpoints = localEndpoints diff --git a/pkg/proxy/nftables/proxier.go b/pkg/proxy/nftables/proxier.go index b1bba524119..15590214325 100644 --- a/pkg/proxy/nftables/proxier.go +++ b/pkg/proxy/nftables/proxier.go @@ -1310,7 +1310,7 @@ func (proxier *Proxier) syncProxyRules() { // from this node, given the service's traffic policies. hasEndpoints is true // if the service has any usable endpoints on any node, not just this one. allEndpoints := proxier.endpointsMap[svcName] - clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels) + clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.nodeLabels) // skipServiceUpdate is used for all service-related chains and their elements. // If no changes were done to the service or its endpoints, these objects may be skipped. diff --git a/pkg/proxy/topology.go b/pkg/proxy/topology.go index 2a514b0816f..c91961d7cec 100644 --- a/pkg/proxy/topology.go +++ b/pkg/proxy/topology.go @@ -18,7 +18,9 @@ package proxy import ( v1 "k8s.io/api/core/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" ) // CategorizeEndpoints returns: @@ -39,16 +41,18 @@ import ( // "Usable endpoints" means Ready endpoints by default, but will fall back to // Serving-Terminating endpoints (independently for Cluster and Local) if no Ready // endpoints are available. -func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) { - var useTopology, useServingTerminatingEndpoints bool +func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeName string, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) { + var topologyMode string + var useServingTerminatingEndpoints bool if svcInfo.UsesClusterEndpoints() { - useTopology = canUseTopology(endpoints, nodeLabels) + zone := nodeLabels[v1.LabelTopologyZone] + topologyMode = topologyModeFromHints(svcInfo, endpoints, nodeName, zone) clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool { if !ep.IsReady() { return false } - if useTopology && !availableForTopology(ep, nodeLabels) { + if !availableForTopology(ep, topologyMode, nodeName, zone) { return false } return true @@ -114,9 +118,9 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels m return } - if !useTopology && !useServingTerminatingEndpoints { + if topologyMode == "" && !useServingTerminatingEndpoints { // !useServingTerminatingEndpoints means that localEndpoints contains only - // Ready endpoints. !useTopology means that clusterEndpoints contains *every* + // Ready endpoints. topologyMode=="" means that clusterEndpoints contains *every* // Ready endpoint. So clusterEndpoints must be a superset of localEndpoints. allReachableEndpoints = clusterEndpoints return @@ -140,48 +144,77 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels m return } -// canUseTopology returns true if all of the following are true: -// - The node's labels include "topology.kubernetes.io/zone". -// - All of the endpoints for this Service have a topology hint. -// - At least one endpoint for this Service is hinted for this node's zone. -func canUseTopology(endpoints []Endpoint, nodeLabels map[string]string) bool { - zone, foundZone := nodeLabels[v1.LabelTopologyZone] +// topologyModeFromHints returns a topology mode ("", "PreferSameZone", or +// "PreferSameNode") based on the Endpoint hints: +// - If the PreferSameTrafficDistribution feature gate is enabled, and every ready +// endpoint has a node hint, and at least one endpoint is hinted for this node, then +// it returns "PreferSameNode". +// - Otherwise, if every ready endpoint has a zone hint, and at least one endpoint is +// hinted for this node's zone, then it returns "PreferSameZone". +// - Otherwise it returns "" (meaning, no topology / default traffic distribution). +func topologyModeFromHints(svcInfo ServicePort, endpoints []Endpoint, nodeName, zone string) string { + hasEndpointForNode := false + allEndpointsHaveNodeHints := true hasEndpointForZone := false + allEndpointsHaveZoneHints := true for _, endpoint := range endpoints { if !endpoint.IsReady() { continue } - // If any of the endpoints do not have zone hints, we bail out + if endpoint.NodeHints().Len() == 0 { + allEndpointsHaveNodeHints = false + } else if endpoint.NodeHints().Has(nodeName) { + hasEndpointForNode = true + } + if endpoint.ZoneHints().Len() == 0 { - klog.V(7).InfoS("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint", "endpoint", endpoint) - return false - } - - // If we've made it this far, we have endpoints with hints set. Now we check if there is a - // zone label, if there isn't one we log a warning and bail out - if !foundZone || zone == "" { - klog.V(2).InfoS("Skipping topology aware endpoint filtering since node is missing label", "label", v1.LabelTopologyZone) - return false - } - - if endpoint.ZoneHints().Has(zone) { + allEndpointsHaveZoneHints = false + } else if endpoint.ZoneHints().Has(zone) { hasEndpointForZone = true } } - if !hasEndpointForZone { - klog.V(7).InfoS("Skipping topology aware endpoint filtering since no hints were provided for zone", "zone", zone) - return false + if utilfeature.DefaultFeatureGate.Enabled(features.PreferSameTrafficDistribution) { + if allEndpointsHaveNodeHints { + if hasEndpointForNode { + return v1.ServiceTrafficDistributionPreferSameNode + } + klog.V(2).InfoS("Ignoring same-node topology hints for service since no hints were provided for node", "service", svcInfo, "node", nodeName) + } else { + klog.V(7).InfoS("Ignoring same-node topology hints for service since one or more endpoints is missing a node hint", "service", svcInfo) + } } - return true + if allEndpointsHaveZoneHints { + if hasEndpointForZone { + return v1.ServiceTrafficDistributionPreferSameZone + } + if zone == "" { + klog.V(2).InfoS("Ignoring same-zone topology hints for service since node is missing label", "service", svcInfo, "label", v1.LabelTopologyZone) + } else { + klog.V(2).InfoS("Ignoring same-zone topology hints for service since no hints were provided for zone", "service", svcInfo, "zone", zone) + } + } else { + klog.V(7).InfoS("Ignoring same-zone topology hints for service since one or more endpoints is missing a zone hint", "service", svcInfo.String()) + } + + return "" } -// availableForTopology checks if this endpoint is available for use on this node, given -// topology constraints. (It assumes that canUseTopology() returned true.) -func availableForTopology(endpoint Endpoint, nodeLabels map[string]string) bool { - zone := nodeLabels[v1.LabelTopologyZone] - return endpoint.ZoneHints().Has(zone) +// availableForTopology checks if this endpoint is available for use on this node when +// using the given topologyMode. (Note that there's no fallback here; the fallback happens +// when deciding which mode to use, not when applying that decision.) +func availableForTopology(endpoint Endpoint, topologyMode, nodeName, zone string) bool { + switch topologyMode { + case "": + return true + case v1.ServiceTrafficDistributionPreferSameNode: + return endpoint.NodeHints().Has(nodeName) + case v1.ServiceTrafficDistributionPreferSameZone: + return endpoint.ZoneHints().Has(zone) + default: + return false + } } // filterEndpoints filters endpoints according to predicate diff --git a/pkg/proxy/topology_test.go b/pkg/proxy/topology_test.go index 96d0cd549fd..807dc513d95 100644 --- a/pkg/proxy/topology_test.go +++ b/pkg/proxy/topology_test.go @@ -23,6 +23,9 @@ import ( v1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" ) func checkExpectedEndpoints(expected sets.Set[string], actual []Endpoint) error { @@ -44,10 +47,12 @@ func checkExpectedEndpoints(expected sets.Set[string], actual []Endpoint) error func TestCategorizeEndpoints(t *testing.T) { testCases := []struct { - name string - nodeLabels map[string]string - serviceInfo ServicePort - endpoints []Endpoint + name string + preferSameEnabled bool + nodeName string + nodeLabels map[string]string + serviceInfo ServicePort + endpoints []Endpoint // We distinguish `nil` ("service doesn't use this kind of endpoints") from // `sets.Set[string]()` ("service uses this kind of endpoints but has no endpoints"). @@ -172,6 +177,62 @@ func TestCategorizeEndpoints(t *testing.T) { }, clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"), localEndpoints: nil, + }, { + name: "PreferSameNode falls back to same-zone when feature gate disabled", + preferSameEnabled: false, + nodeName: "node-1", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServicePortInfo{}, + endpoints: []Endpoint{ + &BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-1"), ready: true}, + &BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), nodeHints: sets.New[string]("node-2"), ready: true}, + &BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), nodeHints: sets.New[string]("node-3"), ready: true}, + &BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-4"), ready: true}, + }, + clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.6:80"), + localEndpoints: nil, + }, { + name: "PreferSameNode available", + preferSameEnabled: true, + nodeName: "node-1", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServicePortInfo{}, + endpoints: []Endpoint{ + &BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-1"), ready: true}, + &BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), nodeHints: sets.New[string]("node-2"), ready: true}, + &BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), nodeHints: sets.New[string]("node-3"), ready: true}, + &BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-4"), ready: true}, + }, + clusterEndpoints: sets.New[string]("10.1.2.3:80"), + localEndpoints: nil, + }, { + name: "PreferSameNode ignored if some endpoints unhinted", + preferSameEnabled: true, + nodeName: "node-1", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServicePortInfo{}, + endpoints: []Endpoint{ + &BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-1"), ready: true}, + &BaseEndpointInfo{endpoint: "10.1.2.4:80", ready: true}, + &BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), nodeHints: sets.New[string]("node-3"), ready: true}, + &BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-4"), ready: true}, + }, + clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"), + localEndpoints: nil, + }, { + name: "PreferSameNode falls back to PreferSameZone if no endpoint for node", + preferSameEnabled: true, + nodeName: "node-0", + nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, + serviceInfo: &BaseServicePortInfo{}, + endpoints: []Endpoint{ + &BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-1"), ready: true}, + &BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), nodeHints: sets.New[string]("node-2"), ready: true}, + &BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), nodeHints: sets.New[string]("node-3"), ready: true}, + &BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-4"), ready: true}, + }, + clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.6:80"), + localEndpoints: nil, }, { name: "conflicting topology and localness require merging allEndpoints", nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"}, @@ -333,7 +394,9 @@ func TestCategorizeEndpoints(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - clusterEndpoints, localEndpoints, allEndpoints, hasAnyEndpoints := CategorizeEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeLabels) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreferSameTrafficDistribution, tc.preferSameEnabled) + + clusterEndpoints, localEndpoints, allEndpoints, hasAnyEndpoints := CategorizeEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeName, tc.nodeLabels) if tc.clusterEndpoints == nil && clusterEndpoints != nil { t.Errorf("expected no cluster endpoints but got %v", clusterEndpoints) diff --git a/pkg/proxy/winkernel/proxier.go b/pkg/proxy/winkernel/proxier.go index 70bb3303cb6..d3504964200 100644 --- a/pkg/proxy/winkernel/proxier.go +++ b/pkg/proxy/winkernel/proxier.go @@ -358,6 +358,11 @@ func (info *endpointInfo) ZoneHints() sets.Set[string] { return sets.Set[string]{} } +// NodeHints returns the node hints for the endpoint. +func (info *endpointInfo) NodeHints() sets.Set[string] { + return sets.Set[string]{} +} + // IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface. func (info *endpointInfo) IP() string { return info.ip