Implement PreferSameNode traffic distribution in kube-proxy

This commit is contained in:
Dan Winship 2025-02-11 14:43:04 -05:00
parent c85083589c
commit 88f8e6697d
8 changed files with 174 additions and 51 deletions

View File

@ -46,8 +46,11 @@ type Endpoint interface {
IsTerminating() bool
// ZoneHints returns the zone hint for the endpoint. This is based on
// endpoint.hints.forZones[0].name in the EndpointSlice API.
// endpoint.hints.forZones[*].name in the EndpointSlice API.
ZoneHints() sets.Set[string]
// NodeHints returns the node hint for the endpoint. This is based on
// endpoint.hints.forNodes[*].name in the EndpointSlice API.
NodeHints() sets.Set[string]
}
// BaseEndpointInfo contains base information that defines an endpoint.
@ -78,6 +81,9 @@ type BaseEndpointInfo struct {
// zoneHints represent the zone hints for the endpoint. This is based on
// endpoint.hints.forZones[*].name in the EndpointSlice API.
zoneHints sets.Set[string]
// nodeHints represent the node hints for the endpoint. This is based on
// endpoint.hints.forNodes[*].name in the EndpointSlice API.
nodeHints sets.Set[string]
}
var _ Endpoint = &BaseEndpointInfo{}
@ -119,12 +125,17 @@ func (info *BaseEndpointInfo) IsTerminating() bool {
return info.terminating
}
// ZoneHints returns the zone hint for the endpoint.
// ZoneHints returns the zone hints for the endpoint.
func (info *BaseEndpointInfo) ZoneHints() sets.Set[string] {
return info.zoneHints
}
func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints sets.Set[string]) *BaseEndpointInfo {
// NodeHints returns the node hints for the endpoint.
func (info *BaseEndpointInfo) NodeHints() sets.Set[string] {
return info.nodeHints
}
func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminating bool, zoneHints, nodeHints sets.Set[string]) *BaseEndpointInfo {
return &BaseEndpointInfo{
ip: ip,
port: port,
@ -134,5 +145,6 @@ func newBaseEndpointInfo(ip string, port int, isLocal, ready, serving, terminati
serving: serving,
terminating: terminating,
zoneHints: zoneHints,
nodeHints: nodeHints,
}
}

View File

@ -25,7 +25,9 @@ import (
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
utilnet "k8s.io/utils/net"
)
@ -210,17 +212,25 @@ func (cache *EndpointSliceCache) addEndpoints(svcPortName *ServicePortName, port
serving := endpoint.Conditions.Serving == nil || *endpoint.Conditions.Serving
terminating := endpoint.Conditions.Terminating != nil && *endpoint.Conditions.Terminating
var zoneHints sets.Set[string]
if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 {
zoneHints = sets.New[string]()
for _, zone := range endpoint.Hints.ForZones {
zoneHints.Insert(zone.Name)
var zoneHints, nodeHints sets.Set[string]
if endpoint.Hints != nil {
if len(endpoint.Hints.ForZones) > 0 {
zoneHints = sets.New[string]()
for _, zone := range endpoint.Hints.ForZones {
zoneHints.Insert(zone.Name)
}
}
if len(endpoint.Hints.ForNodes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.PreferSameTrafficDistribution) {
nodeHints = sets.New[string]()
for _, node := range endpoint.Hints.ForNodes {
nodeHints.Insert(node.Name)
}
}
}
endpointIP := utilnet.ParseIPSloppy(endpoint.Addresses[0]).String()
endpointInfo := newBaseEndpointInfo(endpointIP, portNum, isLocal,
ready, serving, terminating, zoneHints)
ready, serving, terminating, zoneHints, nodeHints)
// This logic ensures we're deduplicating potential overlapping endpoints
// isLocal should not vary between matching endpoints, but if it does, we

View File

@ -987,7 +987,7 @@ func (proxier *Proxier) syncProxyRules() {
// from this node, given the service's traffic policies. hasEndpoints is true
// if the service has any usable endpoints on any node, not just this one.
allEndpoints := proxier.endpointsMap[svcName]
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.nodeLabels)
// clusterPolicyChain contains the endpoints used with "Cluster" traffic policy
clusterPolicyChain := svcInfo.clusterPolicyChainName

View File

@ -1843,7 +1843,7 @@ func (proxier *Proxier) syncEndpoint(svcPortName proxy.ServicePortName, onlyNode
if !ok {
proxier.logger.Info("Unable to filter endpoints due to missing service info", "servicePortName", svcPortName)
} else {
clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeLabels)
clusterEndpoints, localEndpoints, _, hasAnyEndpoints := proxy.CategorizeEndpoints(endpoints, svcInfo, proxier.nodeName, proxier.nodeLabels)
if onlyNodeLocalEndpoints {
if len(localEndpoints) > 0 {
endpoints = localEndpoints

View File

@ -1310,7 +1310,7 @@ func (proxier *Proxier) syncProxyRules() {
// from this node, given the service's traffic policies. hasEndpoints is true
// if the service has any usable endpoints on any node, not just this one.
allEndpoints := proxier.endpointsMap[svcName]
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeLabels)
clusterEndpoints, localEndpoints, allLocallyReachableEndpoints, hasEndpoints := proxy.CategorizeEndpoints(allEndpoints, svcInfo, proxier.nodeName, proxier.nodeLabels)
// skipServiceUpdate is used for all service-related chains and their elements.
// If no changes were done to the service or its endpoints, these objects may be skipped.

View File

@ -18,7 +18,9 @@ package proxy
import (
v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
)
// CategorizeEndpoints returns:
@ -39,16 +41,18 @@ import (
// "Usable endpoints" means Ready endpoints by default, but will fall back to
// Serving-Terminating endpoints (independently for Cluster and Local) if no Ready
// endpoints are available.
func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) {
var useTopology, useServingTerminatingEndpoints bool
func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeName string, nodeLabels map[string]string) (clusterEndpoints, localEndpoints, allReachableEndpoints []Endpoint, hasAnyEndpoints bool) {
var topologyMode string
var useServingTerminatingEndpoints bool
if svcInfo.UsesClusterEndpoints() {
useTopology = canUseTopology(endpoints, nodeLabels)
zone := nodeLabels[v1.LabelTopologyZone]
topologyMode = topologyModeFromHints(svcInfo, endpoints, nodeName, zone)
clusterEndpoints = filterEndpoints(endpoints, func(ep Endpoint) bool {
if !ep.IsReady() {
return false
}
if useTopology && !availableForTopology(ep, nodeLabels) {
if !availableForTopology(ep, topologyMode, nodeName, zone) {
return false
}
return true
@ -114,9 +118,9 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels m
return
}
if !useTopology && !useServingTerminatingEndpoints {
if topologyMode == "" && !useServingTerminatingEndpoints {
// !useServingTerminatingEndpoints means that localEndpoints contains only
// Ready endpoints. !useTopology means that clusterEndpoints contains *every*
// Ready endpoints. topologyMode=="" means that clusterEndpoints contains *every*
// Ready endpoint. So clusterEndpoints must be a superset of localEndpoints.
allReachableEndpoints = clusterEndpoints
return
@ -140,48 +144,77 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels m
return
}
// canUseTopology returns true if all of the following are true:
// - The node's labels include "topology.kubernetes.io/zone".
// - All of the endpoints for this Service have a topology hint.
// - At least one endpoint for this Service is hinted for this node's zone.
func canUseTopology(endpoints []Endpoint, nodeLabels map[string]string) bool {
zone, foundZone := nodeLabels[v1.LabelTopologyZone]
// topologyModeFromHints returns a topology mode ("", "PreferSameZone", or
// "PreferSameNode") based on the Endpoint hints:
// - If the PreferSameTrafficDistribution feature gate is enabled, and every ready
// endpoint has a node hint, and at least one endpoint is hinted for this node, then
// it returns "PreferSameNode".
// - Otherwise, if every ready endpoint has a zone hint, and at least one endpoint is
// hinted for this node's zone, then it returns "PreferSameZone".
// - Otherwise it returns "" (meaning, no topology / default traffic distribution).
func topologyModeFromHints(svcInfo ServicePort, endpoints []Endpoint, nodeName, zone string) string {
hasEndpointForNode := false
allEndpointsHaveNodeHints := true
hasEndpointForZone := false
allEndpointsHaveZoneHints := true
for _, endpoint := range endpoints {
if !endpoint.IsReady() {
continue
}
// If any of the endpoints do not have zone hints, we bail out
if endpoint.NodeHints().Len() == 0 {
allEndpointsHaveNodeHints = false
} else if endpoint.NodeHints().Has(nodeName) {
hasEndpointForNode = true
}
if endpoint.ZoneHints().Len() == 0 {
klog.V(7).InfoS("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint", "endpoint", endpoint)
return false
}
// If we've made it this far, we have endpoints with hints set. Now we check if there is a
// zone label, if there isn't one we log a warning and bail out
if !foundZone || zone == "" {
klog.V(2).InfoS("Skipping topology aware endpoint filtering since node is missing label", "label", v1.LabelTopologyZone)
return false
}
if endpoint.ZoneHints().Has(zone) {
allEndpointsHaveZoneHints = false
} else if endpoint.ZoneHints().Has(zone) {
hasEndpointForZone = true
}
}
if !hasEndpointForZone {
klog.V(7).InfoS("Skipping topology aware endpoint filtering since no hints were provided for zone", "zone", zone)
return false
if utilfeature.DefaultFeatureGate.Enabled(features.PreferSameTrafficDistribution) {
if allEndpointsHaveNodeHints {
if hasEndpointForNode {
return v1.ServiceTrafficDistributionPreferSameNode
}
klog.V(2).InfoS("Ignoring same-node topology hints for service since no hints were provided for node", "service", svcInfo, "node", nodeName)
} else {
klog.V(7).InfoS("Ignoring same-node topology hints for service since one or more endpoints is missing a node hint", "service", svcInfo)
}
}
return true
if allEndpointsHaveZoneHints {
if hasEndpointForZone {
return v1.ServiceTrafficDistributionPreferSameZone
}
if zone == "" {
klog.V(2).InfoS("Ignoring same-zone topology hints for service since node is missing label", "service", svcInfo, "label", v1.LabelTopologyZone)
} else {
klog.V(2).InfoS("Ignoring same-zone topology hints for service since no hints were provided for zone", "service", svcInfo, "zone", zone)
}
} else {
klog.V(7).InfoS("Ignoring same-zone topology hints for service since one or more endpoints is missing a zone hint", "service", svcInfo.String())
}
return ""
}
// availableForTopology checks if this endpoint is available for use on this node, given
// topology constraints. (It assumes that canUseTopology() returned true.)
func availableForTopology(endpoint Endpoint, nodeLabels map[string]string) bool {
zone := nodeLabels[v1.LabelTopologyZone]
return endpoint.ZoneHints().Has(zone)
// availableForTopology checks if this endpoint is available for use on this node when
// using the given topologyMode. (Note that there's no fallback here; the fallback happens
// when deciding which mode to use, not when applying that decision.)
func availableForTopology(endpoint Endpoint, topologyMode, nodeName, zone string) bool {
switch topologyMode {
case "":
return true
case v1.ServiceTrafficDistributionPreferSameNode:
return endpoint.NodeHints().Has(nodeName)
case v1.ServiceTrafficDistributionPreferSameZone:
return endpoint.ZoneHints().Has(zone)
default:
return false
}
}
// filterEndpoints filters endpoints according to predicate

View File

@ -23,6 +23,9 @@ import (
v1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
)
func checkExpectedEndpoints(expected sets.Set[string], actual []Endpoint) error {
@ -44,10 +47,12 @@ func checkExpectedEndpoints(expected sets.Set[string], actual []Endpoint) error
func TestCategorizeEndpoints(t *testing.T) {
testCases := []struct {
name string
nodeLabels map[string]string
serviceInfo ServicePort
endpoints []Endpoint
name string
preferSameEnabled bool
nodeName string
nodeLabels map[string]string
serviceInfo ServicePort
endpoints []Endpoint
// We distinguish `nil` ("service doesn't use this kind of endpoints") from
// `sets.Set[string]()` ("service uses this kind of endpoints but has no endpoints").
@ -172,6 +177,62 @@ func TestCategorizeEndpoints(t *testing.T) {
},
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "PreferSameNode falls back to same-zone when feature gate disabled",
preferSameEnabled: false,
nodeName: "node-1",
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServicePortInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-1"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), nodeHints: sets.New[string]("node-2"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), nodeHints: sets.New[string]("node-3"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-4"), ready: true},
},
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "PreferSameNode available",
preferSameEnabled: true,
nodeName: "node-1",
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServicePortInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-1"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), nodeHints: sets.New[string]("node-2"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), nodeHints: sets.New[string]("node-3"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-4"), ready: true},
},
clusterEndpoints: sets.New[string]("10.1.2.3:80"),
localEndpoints: nil,
}, {
name: "PreferSameNode ignored if some endpoints unhinted",
preferSameEnabled: true,
nodeName: "node-1",
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServicePortInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-1"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.4:80", ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), nodeHints: sets.New[string]("node-3"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-4"), ready: true},
},
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "PreferSameNode falls back to PreferSameZone if no endpoint for node",
preferSameEnabled: true,
nodeName: "node-0",
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
serviceInfo: &BaseServicePortInfo{},
endpoints: []Endpoint{
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-1"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), nodeHints: sets.New[string]("node-2"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), nodeHints: sets.New[string]("node-3"), ready: true},
&BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), nodeHints: sets.New[string]("node-4"), ready: true},
},
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.6:80"),
localEndpoints: nil,
}, {
name: "conflicting topology and localness require merging allEndpoints",
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
@ -333,7 +394,9 @@ func TestCategorizeEndpoints(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
clusterEndpoints, localEndpoints, allEndpoints, hasAnyEndpoints := CategorizeEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeLabels)
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreferSameTrafficDistribution, tc.preferSameEnabled)
clusterEndpoints, localEndpoints, allEndpoints, hasAnyEndpoints := CategorizeEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeName, tc.nodeLabels)
if tc.clusterEndpoints == nil && clusterEndpoints != nil {
t.Errorf("expected no cluster endpoints but got %v", clusterEndpoints)

View File

@ -358,6 +358,11 @@ func (info *endpointInfo) ZoneHints() sets.Set[string] {
return sets.Set[string]{}
}
// NodeHints returns the node hints for the endpoint.
func (info *endpointInfo) NodeHints() sets.Set[string] {
return sets.Set[string]{}
}
// IP returns just the IP part of the endpoint, it's a part of proxy.Endpoint interface.
func (info *endpointInfo) IP() string {
return info.ip