mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 02:34:03 +00:00
Merge pull request #123487 from gauravkghildiyal/kep-4444
Introduce trafficDistribution field for Kubernetes Services
This commit is contained in:
commit
a76a3e031f
4
api/openapi-spec/swagger.json
generated
4
api/openapi-spec/swagger.json
generated
@ -10673,6 +10673,10 @@
|
||||
"$ref": "#/definitions/io.k8s.api.core.v1.SessionAffinityConfig",
|
||||
"description": "sessionAffinityConfig contains the configurations of session affinity."
|
||||
},
|
||||
"trafficDistribution": {
|
||||
"description": "TrafficDistribution offers a way to express preferences for how traffic is distributed to Service endpoints. Implementations can use this field as a hint, but are not required to guarantee strict adherence. If the field is not set, the implementation will apply its default routing strategy. If set to \"PreferClose\", implementations should prioritize endpoints that are topologically close (e.g., same zone).",
|
||||
"type": "string"
|
||||
},
|
||||
"type": {
|
||||
"description": "type determines how the Service is exposed. Defaults to ClusterIP. Valid options are ExternalName, ClusterIP, NodePort, and LoadBalancer. \"ClusterIP\" allocates a cluster-internal IP address for load-balancing to endpoints. Endpoints are determined by the selector or if that is not specified, by manual construction of an Endpoints object or EndpointSlice objects. If clusterIP is \"None\", no virtual IP is allocated and the endpoints are published as a set of endpoints rather than a virtual IP. \"NodePort\" builds on ClusterIP and allocates a port on every node which routes to the same endpoints as the clusterIP. \"LoadBalancer\" builds on NodePort and creates an external load-balancer (if supported in the current cloud) which routes to the same endpoints as the clusterIP. \"ExternalName\" aliases this service to the specified externalName. Several other fields do not apply to ExternalName services. More info: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types",
|
||||
"type": "string"
|
||||
|
@ -7447,6 +7447,10 @@
|
||||
],
|
||||
"description": "sessionAffinityConfig contains the configurations of session affinity."
|
||||
},
|
||||
"trafficDistribution": {
|
||||
"description": "TrafficDistribution offers a way to express preferences for how traffic is distributed to Service endpoints. Implementations can use this field as a hint, but are not required to guarantee strict adherence. If the field is not set, the implementation will apply its default routing strategy. If set to \"PreferClose\", implementations should prioritize endpoints that are topologically close (e.g., same zone).",
|
||||
"type": "string"
|
||||
},
|
||||
"type": {
|
||||
"description": "type determines how the Service is exposed. Defaults to ClusterIP. Valid options are ExternalName, ClusterIP, NodePort, and LoadBalancer. \"ClusterIP\" allocates a cluster-internal IP address for load-balancing to endpoints. Endpoints are determined by the selector or if that is not specified, by manual construction of an Endpoints object or EndpointSlice objects. If clusterIP is \"None\", no virtual IP is allocated and the endpoints are published as a set of endpoints rather than a virtual IP. \"NodePort\" builds on ClusterIP and allocates a port on every node which routes to the same endpoints as the clusterIP. \"LoadBalancer\" builds on NodePort and creates an external load-balancer (if supported in the current cloud) which routes to the same endpoints as the clusterIP. \"ExternalName\" aliases this service to the specified externalName. Several other fields do not apply to ExternalName services. More info: https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types",
|
||||
"type": "string"
|
||||
|
@ -4163,6 +4163,18 @@ const (
|
||||
ServiceExternalTrafficPolicyLocal ServiceExternalTrafficPolicy = "Local"
|
||||
)
|
||||
|
||||
// These are valid values for the TrafficDistribution field of a Service.
|
||||
const (
|
||||
// Indicates a preference for routing traffic to endpoints that are
|
||||
// topologically proximate to the client. The interpretation of "topologically
|
||||
// proximate" may vary across implementations and could encompass endpoints
|
||||
// within the same node, rack, zone, or even region. Setting this value gives
|
||||
// implementations permission to make different tradeoffs, e.g. optimizing for
|
||||
// proximity rather than equal distribution of load. Users should not set this
|
||||
// value if such tradeoffs are not acceptable.
|
||||
ServiceTrafficDistributionPreferClose = "PreferClose"
|
||||
)
|
||||
|
||||
// These are the valid conditions of a service.
|
||||
const (
|
||||
// LoadBalancerPortsError represents the condition of the requested ports
|
||||
@ -4426,6 +4438,15 @@ type ServiceSpec struct {
|
||||
// (possibly modified by topology and other features).
|
||||
// +optional
|
||||
InternalTrafficPolicy *ServiceInternalTrafficPolicy
|
||||
|
||||
// TrafficDistribution offers a way to express preferences for how traffic is
|
||||
// distributed to Service endpoints. Implementations can use this field as a
|
||||
// hint, but are not required to guarantee strict adherence. If the field is
|
||||
// not set, the implementation will apply its default routing strategy. If set
|
||||
// to "PreferClose", implementations should prioritize endpoints that are
|
||||
// topologically close (e.g., same zone).
|
||||
// +optional
|
||||
TrafficDistribution *string
|
||||
}
|
||||
|
||||
// ServicePort represents the port on which the service is exposed
|
||||
|
2
pkg/apis/core/v1/zz_generated.conversion.go
generated
2
pkg/apis/core/v1/zz_generated.conversion.go
generated
@ -8049,6 +8049,7 @@ func autoConvert_v1_ServiceSpec_To_core_ServiceSpec(in *v1.ServiceSpec, out *cor
|
||||
out.AllocateLoadBalancerNodePorts = (*bool)(unsafe.Pointer(in.AllocateLoadBalancerNodePorts))
|
||||
out.LoadBalancerClass = (*string)(unsafe.Pointer(in.LoadBalancerClass))
|
||||
out.InternalTrafficPolicy = (*core.ServiceInternalTrafficPolicy)(unsafe.Pointer(in.InternalTrafficPolicy))
|
||||
out.TrafficDistribution = (*string)(unsafe.Pointer(in.TrafficDistribution))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -8077,6 +8078,7 @@ func autoConvert_core_ServiceSpec_To_v1_ServiceSpec(in *core.ServiceSpec, out *v
|
||||
out.AllocateLoadBalancerNodePorts = (*bool)(unsafe.Pointer(in.AllocateLoadBalancerNodePorts))
|
||||
out.LoadBalancerClass = (*string)(unsafe.Pointer(in.LoadBalancerClass))
|
||||
out.InternalTrafficPolicy = (*v1.ServiceInternalTrafficPolicy)(unsafe.Pointer(in.InternalTrafficPolicy))
|
||||
out.TrafficDistribution = (*string)(unsafe.Pointer(in.TrafficDistribution))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -5534,6 +5534,9 @@ func ValidateService(service *core.Service) field.ErrorList {
|
||||
// internal traffic policy field
|
||||
allErrs = append(allErrs, validateServiceInternalTrafficFieldsValue(service)...)
|
||||
|
||||
// traffic distribution field
|
||||
allErrs = append(allErrs, validateServiceTrafficDistribution(service)...)
|
||||
|
||||
return allErrs
|
||||
}
|
||||
|
||||
@ -5651,6 +5654,22 @@ func validateServiceInternalTrafficFieldsValue(service *core.Service) field.Erro
|
||||
return allErrs
|
||||
}
|
||||
|
||||
// validateServiceTrafficDistribution validates the values for the
|
||||
// trafficDistribution field.
|
||||
func validateServiceTrafficDistribution(service *core.Service) field.ErrorList {
|
||||
allErrs := field.ErrorList{}
|
||||
|
||||
if service.Spec.TrafficDistribution == nil {
|
||||
return allErrs
|
||||
}
|
||||
|
||||
if *service.Spec.TrafficDistribution != v1.ServiceTrafficDistributionPreferClose {
|
||||
allErrs = append(allErrs, field.NotSupported(field.NewPath("spec").Child("trafficDistribution"), *service.Spec.TrafficDistribution, []string{v1.ServiceTrafficDistributionPreferClose}))
|
||||
}
|
||||
|
||||
return allErrs
|
||||
}
|
||||
|
||||
// ValidateServiceCreate validates Services as they are created.
|
||||
func ValidateServiceCreate(service *core.Service) field.ErrorList {
|
||||
return ValidateService(service)
|
||||
|
@ -17121,6 +17121,18 @@ func TestValidateServiceCreate(t *testing.T) {
|
||||
s.Annotations[core.AnnotationTopologyMode] = "different"
|
||||
},
|
||||
numErrs: 1,
|
||||
}, {
|
||||
name: "valid: trafficDistribution field set to PreferClose",
|
||||
tweakSvc: func(s *core.Service) {
|
||||
s.Spec.TrafficDistribution = utilpointer.String("PreferClose")
|
||||
},
|
||||
numErrs: 0,
|
||||
}, {
|
||||
name: "invalid: trafficDistribution field set to Random",
|
||||
tweakSvc: func(s *core.Service) {
|
||||
s.Spec.TrafficDistribution = utilpointer.String("Random")
|
||||
},
|
||||
numErrs: 1,
|
||||
},
|
||||
}
|
||||
|
||||
|
5
pkg/apis/core/zz_generated.deepcopy.go
generated
5
pkg/apis/core/zz_generated.deepcopy.go
generated
@ -5677,6 +5677,11 @@ func (in *ServiceSpec) DeepCopyInto(out *ServiceSpec) {
|
||||
*out = new(ServiceInternalTrafficPolicy)
|
||||
**out = **in
|
||||
}
|
||||
if in.TrafficDistribution != nil {
|
||||
in, out := &in.TrafficDistribution, &out.TrafficDistribution
|
||||
*out = new(string)
|
||||
**out = **in
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -175,6 +175,7 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
|
||||
c.topologyCache,
|
||||
c.eventRecorder,
|
||||
controllerName,
|
||||
endpointslicerec.WithTrafficDistributionEnabled(utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution)),
|
||||
)
|
||||
|
||||
return c
|
||||
|
@ -738,6 +738,13 @@ const (
|
||||
// Subdivide the NodePort range for dynamic and static port allocation.
|
||||
ServiceNodePortStaticSubrange featuregate.Feature = "ServiceNodePortStaticSubrange"
|
||||
|
||||
// owner: @gauravkghildiyal @robscott
|
||||
// kep: https://kep.k8s.io/4444
|
||||
// alpha: v1.30
|
||||
//
|
||||
// Enables trafficDistribution field on Services.
|
||||
ServiceTrafficDistribution featuregate.Feature = "ServiceTrafficDistribution"
|
||||
|
||||
// owner: @gjkim42 @SergeyKanzhelev @matthyx @tzneal
|
||||
// kep: http://kep.k8s.io/753
|
||||
// alpha: v1.28
|
||||
@ -1139,6 +1146,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
|
||||
ServiceNodePortStaticSubrange: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.29; remove in 1.31
|
||||
|
||||
ServiceTrafficDistribution: {Default: false, PreRelease: featuregate.Alpha},
|
||||
|
||||
SidecarContainers: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
||||
SizeMemoryBackedVolumes: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
7
pkg/generated/openapi/zz_generated.openapi.go
generated
7
pkg/generated/openapi/zz_generated.openapi.go
generated
@ -29512,6 +29512,13 @@ func schema_k8sio_api_core_v1_ServiceSpec(ref common.ReferenceCallback) common.O
|
||||
Enum: []interface{}{"Cluster", "Local"},
|
||||
},
|
||||
},
|
||||
"trafficDistribution": {
|
||||
SchemaProps: spec.SchemaProps{
|
||||
Description: "TrafficDistribution offers a way to express preferences for how traffic is distributed to Service endpoints. Implementations can use this field as a hint, but are not required to guarantee strict adherence. If the field is not set, the implementation will apply its default routing strategy. If set to \"PreferClose\", implementations should prioritize endpoints that are topologically close (e.g., same zone).",
|
||||
Type: []string{"string"},
|
||||
Format: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -137,21 +137,27 @@ func CategorizeEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels m
|
||||
return
|
||||
}
|
||||
|
||||
// canUseTopology returns true if topology aware routing is enabled and properly configured
|
||||
// in this cluster. That is, it checks that:
|
||||
// * The TopologyAwareHints feature is enabled
|
||||
// * The "service.kubernetes.io/topology-aware-hints" annotation on this Service is set to "Auto"
|
||||
// * The node's labels include "topology.kubernetes.io/zone"
|
||||
// * All of the endpoints for this Service have a topology hint
|
||||
// * At least one endpoint for this Service is hinted for this node's zone.
|
||||
// canUseTopology returns true if topology aware routing is enabled and properly
|
||||
// configured in this cluster. That is, it checks that:
|
||||
// - The TopologyAwareHints or ServiceTrafficDistribution feature is enabled.
|
||||
// - If ServiceTrafficDistribution feature gate is not enabled, then the
|
||||
// hintsAnnotation should represent an enabled value.
|
||||
// - The node's labels include "topology.kubernetes.io/zone".
|
||||
// - All of the endpoints for this Service have a topology hint.
|
||||
// - At least one endpoint for this Service is hinted for this node's zone.
|
||||
func canUseTopology(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[string]string) bool {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) && !utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution) {
|
||||
return false
|
||||
}
|
||||
// Any non-empty and non-disabled values for the hints annotation are acceptable.
|
||||
hintsAnnotation := svcInfo.HintsAnnotation()
|
||||
if hintsAnnotation == "" || hintsAnnotation == "disabled" || hintsAnnotation == "Disabled" {
|
||||
return false
|
||||
|
||||
// Ignore value of hintsAnnotation if the ServiceTrafficDistribution feature
|
||||
// gate is enabled.
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution) {
|
||||
// If the hintsAnnotation has a disabled value, we do not consider hints for route programming.
|
||||
hintsAnnotation := svcInfo.HintsAnnotation()
|
||||
if hintsAnnotation == "" || hintsAnnotation == "disabled" || hintsAnnotation == "Disabled" {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
zone, ok := nodeLabels[v1.LabelTopologyZone]
|
||||
|
@ -47,12 +47,13 @@ func checkExpectedEndpoints(expected sets.Set[string], actual []Endpoint) error
|
||||
|
||||
func TestCategorizeEndpoints(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
hintsEnabled bool
|
||||
pteEnabled bool
|
||||
nodeLabels map[string]string
|
||||
serviceInfo ServicePort
|
||||
endpoints []Endpoint
|
||||
name string
|
||||
hintsEnabled bool
|
||||
trafficDistFeatureEnabled bool
|
||||
pteEnabled bool
|
||||
nodeLabels map[string]string
|
||||
serviceInfo ServicePort
|
||||
endpoints []Endpoint
|
||||
|
||||
// We distinguish `nil` ("service doesn't use this kind of endpoints") from
|
||||
// `sets.Set[string]()` ("service uses this kind of endpoints but has no endpoints").
|
||||
@ -131,10 +132,39 @@ func TestCategorizeEndpoints(t *testing.T) {
|
||||
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.4:80", "10.1.2.5:80", "10.1.2.6:80"),
|
||||
localEndpoints: nil,
|
||||
}, {
|
||||
name: "externalTrafficPolicy: Local, topology ignored for Local endpoints",
|
||||
hintsEnabled: true,
|
||||
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
|
||||
serviceInfo: &BaseServicePortInfo{externalPolicyLocal: true, nodePort: 8080, hintsAnnotation: "auto"},
|
||||
name: "hints, hints annotation empty but trafficDist feature gate enabled, hints are not ignored",
|
||||
hintsEnabled: true,
|
||||
trafficDistFeatureEnabled: true,
|
||||
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
|
||||
serviceInfo: &BaseServicePortInfo{},
|
||||
endpoints: []Endpoint{
|
||||
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), ready: true},
|
||||
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), ready: true},
|
||||
&BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), ready: true},
|
||||
&BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), ready: true},
|
||||
},
|
||||
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.6:80"),
|
||||
localEndpoints: nil,
|
||||
}, {
|
||||
name: "hints disabled, trafficDist feature gate enabled, hints are not ignored",
|
||||
hintsEnabled: false,
|
||||
trafficDistFeatureEnabled: true,
|
||||
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
|
||||
serviceInfo: &BaseServicePortInfo{},
|
||||
endpoints: []Endpoint{
|
||||
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), ready: true},
|
||||
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), ready: true},
|
||||
&BaseEndpointInfo{endpoint: "10.1.2.5:80", zoneHints: sets.New[string]("zone-c"), ready: true},
|
||||
&BaseEndpointInfo{endpoint: "10.1.2.6:80", zoneHints: sets.New[string]("zone-a"), ready: true},
|
||||
},
|
||||
clusterEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.6:80"),
|
||||
localEndpoints: nil,
|
||||
}, {
|
||||
name: "externalTrafficPolicy: Local, topology ignored for Local endpoints",
|
||||
hintsEnabled: true,
|
||||
trafficDistFeatureEnabled: true,
|
||||
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
|
||||
serviceInfo: &BaseServicePortInfo{externalPolicyLocal: true, nodePort: 8080, hintsAnnotation: "auto"},
|
||||
endpoints: []Endpoint{
|
||||
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), ready: true, isLocal: true},
|
||||
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), ready: true, isLocal: true},
|
||||
@ -145,10 +175,11 @@ func TestCategorizeEndpoints(t *testing.T) {
|
||||
localEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.4:80"),
|
||||
allEndpoints: sets.New[string]("10.1.2.3:80", "10.1.2.4:80", "10.1.2.6:80"),
|
||||
}, {
|
||||
name: "internalTrafficPolicy: Local, topology ignored for Local endpoints",
|
||||
hintsEnabled: true,
|
||||
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
|
||||
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, hintsAnnotation: "auto", externalPolicyLocal: false, nodePort: 8080},
|
||||
name: "internalTrafficPolicy: Local, topology ignored for Local endpoints",
|
||||
hintsEnabled: true,
|
||||
trafficDistFeatureEnabled: true,
|
||||
nodeLabels: map[string]string{v1.LabelTopologyZone: "zone-a"},
|
||||
serviceInfo: &BaseServicePortInfo{internalPolicyLocal: true, hintsAnnotation: "auto", externalPolicyLocal: false, nodePort: 8080},
|
||||
endpoints: []Endpoint{
|
||||
&BaseEndpointInfo{endpoint: "10.1.2.3:80", zoneHints: sets.New[string]("zone-a"), ready: true, isLocal: true},
|
||||
&BaseEndpointInfo{endpoint: "10.1.2.4:80", zoneHints: sets.New[string]("zone-b"), ready: true, isLocal: true},
|
||||
@ -458,6 +489,7 @@ func TestCategorizeEndpoints(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.TopologyAwareHints, tc.hintsEnabled)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ServiceTrafficDistribution, tc.trafficDistFeatureEnabled)()
|
||||
|
||||
clusterEndpoints, localEndpoints, allEndpoints, hasAnyEndpoints := CategorizeEndpoints(tc.endpoints, tc.serviceInfo, tc.nodeLabels)
|
||||
|
||||
|
@ -120,7 +120,11 @@ func (svcStrategy) AllowUnconditionalUpdate() bool {
|
||||
// newSvc.Spec.MyFeature = nil
|
||||
// }
|
||||
func dropServiceDisabledFields(newSvc *api.Service, oldSvc *api.Service) {
|
||||
|
||||
// Drop condition for TrafficDistribution field.
|
||||
isTrafficDistributionInUse := (oldSvc != nil && oldSvc.Spec.TrafficDistribution != nil)
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution) && !isTrafficDistributionInUse {
|
||||
newSvc.Spec.TrafficDistribution = nil
|
||||
}
|
||||
}
|
||||
|
||||
type serviceStatusStrategy struct {
|
||||
|
1971
staging/src/k8s.io/api/core/v1/generated.pb.go
generated
1971
staging/src/k8s.io/api/core/v1/generated.pb.go
generated
File diff suppressed because it is too large
Load Diff
@ -5764,6 +5764,15 @@ message ServiceSpec {
|
||||
// (possibly modified by topology and other features).
|
||||
// +optional
|
||||
optional string internalTrafficPolicy = 22;
|
||||
|
||||
// TrafficDistribution offers a way to express preferences for how traffic is
|
||||
// distributed to Service endpoints. Implementations can use this field as a
|
||||
// hint, but are not required to guarantee strict adherence. If the field is
|
||||
// not set, the implementation will apply its default routing strategy. If set
|
||||
// to "PreferClose", implementations should prioritize endpoints that are
|
||||
// topologically close (e.g., same zone).
|
||||
// +optional
|
||||
optional string trafficDistribution = 23;
|
||||
}
|
||||
|
||||
// ServiceStatus represents the current status of a service.
|
||||
|
@ -4906,6 +4906,18 @@ const (
|
||||
ServiceExternalTrafficPolicyTypeCluster = ServiceExternalTrafficPolicyCluster
|
||||
)
|
||||
|
||||
// These are valid values for the TrafficDistribution field of a Service.
|
||||
const (
|
||||
// Indicates a preference for routing traffic to endpoints that are
|
||||
// topologically proximate to the client. The interpretation of "topologically
|
||||
// proximate" may vary across implementations and could encompass endpoints
|
||||
// within the same node, rack, zone, or even region. Setting this value gives
|
||||
// implementations permission to make different tradeoffs, e.g. optimizing for
|
||||
// proximity rather than equal distribution of load. Users should not set this
|
||||
// value if such tradeoffs are not acceptable.
|
||||
ServiceTrafficDistributionPreferClose = "PreferClose"
|
||||
)
|
||||
|
||||
// These are the valid conditions of a service.
|
||||
const (
|
||||
// LoadBalancerPortsError represents the condition of the requested ports
|
||||
@ -5250,6 +5262,15 @@ type ServiceSpec struct {
|
||||
// (possibly modified by topology and other features).
|
||||
// +optional
|
||||
InternalTrafficPolicy *ServiceInternalTrafficPolicy `json:"internalTrafficPolicy,omitempty" protobuf:"bytes,22,opt,name=internalTrafficPolicy"`
|
||||
|
||||
// TrafficDistribution offers a way to express preferences for how traffic is
|
||||
// distributed to Service endpoints. Implementations can use this field as a
|
||||
// hint, but are not required to guarantee strict adherence. If the field is
|
||||
// not set, the implementation will apply its default routing strategy. If set
|
||||
// to "PreferClose", implementations should prioritize endpoints that are
|
||||
// topologically close (e.g., same zone).
|
||||
// +optional
|
||||
TrafficDistribution *string `json:"trafficDistribution,omitempty" protobuf:"bytes,23,opt,name=trafficDistribution"`
|
||||
}
|
||||
|
||||
// ServicePort contains information on service's port.
|
||||
|
@ -2387,6 +2387,7 @@ var map_ServiceSpec = map[string]string{
|
||||
"allocateLoadBalancerNodePorts": "allocateLoadBalancerNodePorts defines if NodePorts will be automatically allocated for services with type LoadBalancer. Default is \"true\". It may be set to \"false\" if the cluster load-balancer does not rely on NodePorts. If the caller requests specific NodePorts (by specifying a value), those requests will be respected, regardless of this field. This field may only be set for services with type LoadBalancer and will be cleared if the type is changed to any other type.",
|
||||
"loadBalancerClass": "loadBalancerClass is the class of the load balancer implementation this Service belongs to. If specified, the value of this field must be a label-style identifier, with an optional prefix, e.g. \"internal-vip\" or \"example.com/internal-vip\". Unprefixed names are reserved for end-users. This field can only be set when the Service type is 'LoadBalancer'. If not set, the default load balancer implementation is used, today this is typically done through the cloud provider integration, but should apply for any default implementation. If set, it is assumed that a load balancer implementation is watching for Services with a matching class. Any default load balancer implementation (e.g. cloud providers) should ignore Services that set this field. This field can only be set when creating or updating a Service to type 'LoadBalancer'. Once set, it can not be changed. This field will be wiped when a service is updated to a non 'LoadBalancer' type.",
|
||||
"internalTrafficPolicy": "InternalTrafficPolicy describes how nodes distribute service traffic they receive on the ClusterIP. If set to \"Local\", the proxy will assume that pods only want to talk to endpoints of the service on the same node as the pod, dropping the traffic if there are no local endpoints. The default value, \"Cluster\", uses the standard behavior of routing to all endpoints evenly (possibly modified by topology and other features).",
|
||||
"trafficDistribution": "TrafficDistribution offers a way to express preferences for how traffic is distributed to Service endpoints. Implementations can use this field as a hint, but are not required to guarantee strict adherence. If the field is not set, the implementation will apply its default routing strategy. If set to \"PreferClose\", implementations should prioritize endpoints that are topologically close (e.g., same zone).",
|
||||
}
|
||||
|
||||
func (ServiceSpec) SwaggerDoc() map[string]string {
|
||||
|
@ -5692,6 +5692,11 @@ func (in *ServiceSpec) DeepCopyInto(out *ServiceSpec) {
|
||||
*out = new(ServiceInternalTrafficPolicy)
|
||||
**out = **in
|
||||
}
|
||||
if in.TrafficDistribution != nil {
|
||||
in, out := &in.TrafficDistribution, &out.TrafficDistribution
|
||||
*out = new(string)
|
||||
**out = **in
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -85,7 +85,8 @@
|
||||
"ipFamilyPolicy": "ipFamilyPolicyValue",
|
||||
"allocateLoadBalancerNodePorts": true,
|
||||
"loadBalancerClass": "loadBalancerClassValue",
|
||||
"internalTrafficPolicy": "internalTrafficPolicyValue"
|
||||
"internalTrafficPolicy": "internalTrafficPolicyValue",
|
||||
"trafficDistribution": "trafficDistributionValue"
|
||||
},
|
||||
"status": {
|
||||
"loadBalancer": {
|
||||
|
Binary file not shown.
@ -64,6 +64,7 @@ spec:
|
||||
sessionAffinityConfig:
|
||||
clientIP:
|
||||
timeoutSeconds: 1
|
||||
trafficDistribution: trafficDistributionValue
|
||||
type: typeValue
|
||||
status:
|
||||
conditions:
|
||||
|
@ -44,6 +44,7 @@ type ServiceSpecApplyConfiguration struct {
|
||||
AllocateLoadBalancerNodePorts *bool `json:"allocateLoadBalancerNodePorts,omitempty"`
|
||||
LoadBalancerClass *string `json:"loadBalancerClass,omitempty"`
|
||||
InternalTrafficPolicy *corev1.ServiceInternalTrafficPolicy `json:"internalTrafficPolicy,omitempty"`
|
||||
TrafficDistribution *string `json:"trafficDistribution,omitempty"`
|
||||
}
|
||||
|
||||
// ServiceSpecApplyConfiguration constructs an declarative configuration of the ServiceSpec type for use with
|
||||
@ -222,3 +223,11 @@ func (b *ServiceSpecApplyConfiguration) WithInternalTrafficPolicy(value corev1.S
|
||||
b.InternalTrafficPolicy = &value
|
||||
return b
|
||||
}
|
||||
|
||||
// WithTrafficDistribution sets the TrafficDistribution field in the declarative configuration to the given value
|
||||
// and returns the receiver, so that objects can be built by chaining "With" function invocations.
|
||||
// If called multiple times, the TrafficDistribution field is set to the value of the last call.
|
||||
func (b *ServiceSpecApplyConfiguration) WithTrafficDistribution(value string) *ServiceSpecApplyConfiguration {
|
||||
b.TrafficDistribution = &value
|
||||
return b
|
||||
}
|
||||
|
@ -7546,6 +7546,9 @@ var schemaYAML = typed.YAMLObject(`types:
|
||||
- name: sessionAffinityConfig
|
||||
type:
|
||||
namedType: io.k8s.api.core.v1.SessionAffinityConfig
|
||||
- name: trafficDistribution
|
||||
type:
|
||||
scalar: string
|
||||
- name: type
|
||||
type:
|
||||
scalar: string
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
endpointsliceutil "k8s.io/endpointslice/util"
|
||||
)
|
||||
@ -27,8 +28,9 @@ import (
|
||||
// NewCache returns a new Cache with the specified endpointsPerSlice.
|
||||
func NewCache(endpointsPerSlice int32) *Cache {
|
||||
return &Cache{
|
||||
maxEndpointsPerSlice: endpointsPerSlice,
|
||||
cache: map[types.NamespacedName]*ServicePortCache{},
|
||||
maxEndpointsPerSlice: endpointsPerSlice,
|
||||
cache: map[types.NamespacedName]*ServicePortCache{},
|
||||
servicesByTrafficDistribution: make(map[string]map[types.NamespacedName]bool),
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,7 +42,7 @@ type Cache struct {
|
||||
maxEndpointsPerSlice int32
|
||||
|
||||
// lock protects changes to numEndpoints, numSlicesActual, numSlicesDesired,
|
||||
// and cache.
|
||||
// cache and servicesByTrafficDistribution
|
||||
lock sync.Mutex
|
||||
// numEndpoints represents the total number of endpoints stored in
|
||||
// EndpointSlices.
|
||||
@ -52,8 +54,18 @@ type Cache struct {
|
||||
// cache stores a ServicePortCache grouped by NamespacedNames representing
|
||||
// Services.
|
||||
cache map[types.NamespacedName]*ServicePortCache
|
||||
// Tracks all services partitioned by their trafficDistribution field.
|
||||
//
|
||||
// The type should be read as map[trafficDistribution]setOfServices
|
||||
servicesByTrafficDistribution map[string]map[types.NamespacedName]bool
|
||||
}
|
||||
|
||||
const (
|
||||
// Label value for cases when service.spec.trafficDistribution is set to an
|
||||
// unknown value.
|
||||
trafficDistributionImplementationSpecific = "ImplementationSpecific"
|
||||
)
|
||||
|
||||
// ServicePortCache tracks values for total numbers of desired endpoints as well
|
||||
// as the efficiency of EndpointSlice endpoints distribution for each unique
|
||||
// Service Port combination.
|
||||
@ -124,12 +136,46 @@ func (c *Cache) UpdateServicePortCache(serviceNN types.NamespacedName, spCache *
|
||||
c.updateMetrics()
|
||||
}
|
||||
|
||||
func (c *Cache) UpdateTrafficDistributionForService(serviceNN types.NamespacedName, trafficDistributionPtr *string) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
defer c.updateMetrics()
|
||||
|
||||
for _, serviceSet := range c.servicesByTrafficDistribution {
|
||||
delete(serviceSet, serviceNN)
|
||||
}
|
||||
|
||||
if trafficDistributionPtr == nil {
|
||||
return
|
||||
}
|
||||
|
||||
trafficDistribution := *trafficDistributionPtr
|
||||
// If we don't explicitly recognize a value for trafficDistribution, it should
|
||||
// be treated as an implementation specific value. All such implementation
|
||||
// specific values should use the label value "ImplementationSpecific" to not
|
||||
// explode the metric labels cardinality.
|
||||
if trafficDistribution != corev1.ServiceTrafficDistributionPreferClose {
|
||||
trafficDistribution = trafficDistributionImplementationSpecific
|
||||
}
|
||||
serviceSet, ok := c.servicesByTrafficDistribution[trafficDistribution]
|
||||
if !ok {
|
||||
serviceSet = make(map[types.NamespacedName]bool)
|
||||
c.servicesByTrafficDistribution[trafficDistribution] = serviceSet
|
||||
}
|
||||
serviceSet[serviceNN] = true
|
||||
}
|
||||
|
||||
// DeleteService removes references of a Service from the global cache and
|
||||
// updates the corresponding metrics.
|
||||
func (c *Cache) DeleteService(serviceNN types.NamespacedName) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
for _, serviceSet := range c.servicesByTrafficDistribution {
|
||||
delete(serviceSet, serviceNN)
|
||||
}
|
||||
|
||||
if spCache, ok := c.cache[serviceNN]; ok {
|
||||
actualSlices, desiredSlices, endpoints := spCache.totals(int(c.maxEndpointsPerSlice))
|
||||
c.numEndpoints = c.numEndpoints - endpoints
|
||||
@ -137,7 +183,6 @@ func (c *Cache) DeleteService(serviceNN types.NamespacedName) {
|
||||
c.numSlicesActual -= actualSlices
|
||||
c.updateMetrics()
|
||||
delete(c.cache, serviceNN)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -147,6 +192,11 @@ func (c *Cache) updateMetrics() {
|
||||
NumEndpointSlices.WithLabelValues().Set(float64(c.numSlicesActual))
|
||||
DesiredEndpointSlices.WithLabelValues().Set(float64(c.numSlicesDesired))
|
||||
EndpointsDesired.WithLabelValues().Set(float64(c.numEndpoints))
|
||||
|
||||
ServicesCountByTrafficDistribution.Reset()
|
||||
for trafficDistribution, services := range c.servicesByTrafficDistribution {
|
||||
ServicesCountByTrafficDistribution.WithLabelValues(trafficDistribution).Set(float64(len(services)))
|
||||
}
|
||||
}
|
||||
|
||||
// numDesiredSlices calculates the number of EndpointSlices that would exist
|
||||
|
@ -20,6 +20,8 @@ import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
endpointsliceutil "k8s.io/endpointslice/util"
|
||||
@ -89,6 +91,96 @@ func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int
|
||||
}
|
||||
}
|
||||
|
||||
// Tests the mutations to servicesByTrafficDistribution field within Cache
|
||||
// object.
|
||||
func TestCache_ServicesByTrafficDistribution(t *testing.T) {
|
||||
cache := NewCache(0)
|
||||
|
||||
service1 := types.NamespacedName{Namespace: "ns1", Name: "service1"}
|
||||
service2 := types.NamespacedName{Namespace: "ns1", Name: "service2"}
|
||||
service3 := types.NamespacedName{Namespace: "ns2", Name: "service3"}
|
||||
service4 := types.NamespacedName{Namespace: "ns3", Name: "service4"}
|
||||
|
||||
// Define helper function for assertion
|
||||
mustHaveServicesByTrafficDistribution := func(wantServicesByTrafficDistribution map[string]map[types.NamespacedName]bool, desc string) {
|
||||
t.Helper()
|
||||
gotServicesByTrafficDistribution := cache.servicesByTrafficDistribution
|
||||
if diff := cmp.Diff(wantServicesByTrafficDistribution, gotServicesByTrafficDistribution); diff != "" {
|
||||
t.Fatalf("UpdateTrafficDistributionForService(%v) resulted in unexpected diff for cache.servicesByTrafficDistribution; (-want, +got)\n%v", desc, diff)
|
||||
}
|
||||
}
|
||||
|
||||
// Mutate and make assertions
|
||||
|
||||
desc := "service1 starts using trafficDistribution=PreferClose"
|
||||
cache.UpdateTrafficDistributionForService(service1, ptrTo(corev1.ServiceTrafficDistributionPreferClose))
|
||||
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
|
||||
corev1.ServiceTrafficDistributionPreferClose: {service1: true},
|
||||
}, desc)
|
||||
|
||||
desc = "service1 starts using trafficDistribution=PreferClose, retries of similar mutation should be idempotent"
|
||||
cache.UpdateTrafficDistributionForService(service1, ptrTo(corev1.ServiceTrafficDistributionPreferClose))
|
||||
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ // No delta
|
||||
corev1.ServiceTrafficDistributionPreferClose: {service1: true},
|
||||
}, desc)
|
||||
|
||||
desc = "service2 starts using trafficDistribution=PreferClose"
|
||||
cache.UpdateTrafficDistributionForService(service2, ptrTo(corev1.ServiceTrafficDistributionPreferClose))
|
||||
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
|
||||
corev1.ServiceTrafficDistributionPreferClose: {service1: true, service2: true}, // Delta
|
||||
}, desc)
|
||||
|
||||
desc = "service3 starts using trafficDistribution=InvalidValue"
|
||||
cache.UpdateTrafficDistributionForService(service3, ptrTo("InvalidValue"))
|
||||
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
|
||||
corev1.ServiceTrafficDistributionPreferClose: {service1: true, service2: true},
|
||||
trafficDistributionImplementationSpecific: {service3: true}, // Delta
|
||||
}, desc)
|
||||
|
||||
desc = "service4 starts using trafficDistribution=nil"
|
||||
cache.UpdateTrafficDistributionForService(service4, nil)
|
||||
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ // No delta
|
||||
corev1.ServiceTrafficDistributionPreferClose: {service1: true, service2: true},
|
||||
trafficDistributionImplementationSpecific: {service3: true},
|
||||
}, desc)
|
||||
|
||||
desc = "service2 transitions trafficDistribution: PreferClose -> InvalidValue"
|
||||
cache.UpdateTrafficDistributionForService(service2, ptrTo("InvalidValue"))
|
||||
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
|
||||
corev1.ServiceTrafficDistributionPreferClose: {service1: true}, // Delta
|
||||
trafficDistributionImplementationSpecific: {service3: true, service2: true}, // Delta
|
||||
}, desc)
|
||||
|
||||
desc = "service3 gets deleted"
|
||||
cache.DeleteService(service3)
|
||||
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
|
||||
corev1.ServiceTrafficDistributionPreferClose: {service1: true},
|
||||
trafficDistributionImplementationSpecific: {service2: true}, // Delta
|
||||
}, desc)
|
||||
|
||||
desc = "service1 transitions trafficDistribution: PreferClose -> empty"
|
||||
cache.UpdateTrafficDistributionForService(service1, ptrTo(""))
|
||||
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
|
||||
corev1.ServiceTrafficDistributionPreferClose: {}, // Delta
|
||||
trafficDistributionImplementationSpecific: {service1: true, service2: true}, // Delta
|
||||
}, desc)
|
||||
|
||||
desc = "service1 transitions trafficDistribution: InvalidValue -> nil"
|
||||
cache.UpdateTrafficDistributionForService(service1, nil)
|
||||
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
|
||||
corev1.ServiceTrafficDistributionPreferClose: {},
|
||||
trafficDistributionImplementationSpecific: {service2: true}, // Delta
|
||||
}, desc)
|
||||
|
||||
desc = "service2 transitions trafficDistribution: InvalidValue -> nil"
|
||||
cache.UpdateTrafficDistributionForService(service2, nil)
|
||||
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
|
||||
corev1.ServiceTrafficDistributionPreferClose: {},
|
||||
trafficDistributionImplementationSpecific: {}, // Delta
|
||||
}, desc)
|
||||
|
||||
}
|
||||
|
||||
func benchmarkUpdateServicePortCache(b *testing.B, num int) {
|
||||
c := NewCache(int32(100))
|
||||
ns := "benchmark"
|
||||
@ -132,3 +224,7 @@ func BenchmarkUpdateServicePortCache10000(b *testing.B) {
|
||||
func BenchmarkUpdateServicePortCache100000(b *testing.B) {
|
||||
benchmarkUpdateServicePortCache(b, 100000)
|
||||
}
|
||||
|
||||
func ptrTo[T any](obj T) *T {
|
||||
return &obj
|
||||
}
|
||||
|
@ -102,7 +102,10 @@ var (
|
||||
Name: "endpointslices_changed_per_sync",
|
||||
Help: "Number of EndpointSlices changed on each Service sync",
|
||||
},
|
||||
[]string{"topology"}, // either "Auto" or "Disabled"
|
||||
[]string{
|
||||
"topology", // either "Auto" or "Disabled"
|
||||
"traffic_distribution", // "PreferClose" or <empty>
|
||||
},
|
||||
)
|
||||
|
||||
// EndpointSliceSyncs tracks the number of sync operations the controller
|
||||
@ -116,6 +119,18 @@ var (
|
||||
},
|
||||
[]string{"result"}, // either "success", "stale", or "error"
|
||||
)
|
||||
|
||||
// ServicesCountByTrafficDistribution tracks the number of Services using some
|
||||
// specific trafficDistribution
|
||||
ServicesCountByTrafficDistribution = metrics.NewGaugeVec(
|
||||
&metrics.GaugeOpts{
|
||||
Subsystem: EndpointSliceSubsystem,
|
||||
Name: "services_count_by_traffic_distribution",
|
||||
Help: "Number of Services using some specific trafficDistribution",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"traffic_distribution"}, // One of ["PreferClose", "ImplementationSpecific"]
|
||||
)
|
||||
)
|
||||
|
||||
var registerMetrics sync.Once
|
||||
@ -131,5 +146,6 @@ func RegisterMetrics() {
|
||||
legacyregistry.MustRegister(EndpointSliceChanges)
|
||||
legacyregistry.MustRegister(EndpointSlicesChangedPerSync)
|
||||
legacyregistry.MustRegister(EndpointSliceSyncs)
|
||||
legacyregistry.MustRegister(ServicesCountByTrafficDistribution)
|
||||
})
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/endpointslice/metrics"
|
||||
"k8s.io/endpointslice/topologycache"
|
||||
"k8s.io/endpointslice/trafficdist"
|
||||
endpointsliceutil "k8s.io/endpointslice/util"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
@ -50,11 +51,24 @@ type Reconciler struct {
|
||||
// topologyCache tracks the distribution of Nodes and endpoints across zones
|
||||
// to enable TopologyAwareHints.
|
||||
topologyCache *topologycache.TopologyCache
|
||||
// trafficDistributionEnabled determines if endpointDistribution field is to
|
||||
// be considered when reconciling EndpointSlice hints.
|
||||
trafficDistributionEnabled bool
|
||||
// eventRecorder allows Reconciler to record and publish events.
|
||||
eventRecorder record.EventRecorder
|
||||
controllerName string
|
||||
}
|
||||
|
||||
type ReconcilerOption func(*Reconciler)
|
||||
|
||||
// WithTrafficDistributionEnabled controls whether the Reconciler considers the
|
||||
// `trafficDistribution` field while reconciling EndpointSlices.
|
||||
func WithTrafficDistributionEnabled(enabled bool) ReconcilerOption {
|
||||
return func(r *Reconciler) {
|
||||
r.trafficDistributionEnabled = enabled
|
||||
}
|
||||
}
|
||||
|
||||
// endpointMeta includes the attributes we group slices on, this type helps with
|
||||
// that logic in Reconciler
|
||||
type endpointMeta struct {
|
||||
@ -261,9 +275,32 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.
|
||||
Unchanged: unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete),
|
||||
}
|
||||
|
||||
canUseTrafficDistribution := r.trafficDistributionEnabled && !hintsEnabled(service.Annotations)
|
||||
|
||||
// Check if we need to add/remove hints based on the topology annotation.
|
||||
//
|
||||
// This if/else clause can be removed once the annotation has been deprecated.
|
||||
// Ref: https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/4444-service-routing-preference
|
||||
if r.topologyCache != nil && hintsEnabled(service.Annotations) {
|
||||
// Reaching this point means that we need to configure hints based on the
|
||||
// topology annotation.
|
||||
slicesToCreate, slicesToUpdate, events = r.topologyCache.AddHints(logger, si)
|
||||
|
||||
} else {
|
||||
// Reaching this point means that we will not be configuring hints based on
|
||||
// the topology annotation. We need to do 2 things:
|
||||
// 1. If hints were added previously based on the annotation, we need to
|
||||
// clear up any locally cached hints from the topologyCache object.
|
||||
// 2. Optionally remove the actual hints from the EndpointSlice if we know
|
||||
// that the `trafficDistribution` field is also NOT being used. In other
|
||||
// words, if we know that the `trafficDistribution` field has been
|
||||
// correctly configured by the customer, we DO NOT remove the hints and
|
||||
// wait for the trafficDist handlers to correctly configure them. Always
|
||||
// unconditionally removing hints here (and letting them get readded by
|
||||
// the trafficDist) adds extra overhead in the form of DeepCopy (done
|
||||
// within topologyCache.RemoveHints)
|
||||
|
||||
// Check 1.
|
||||
if r.topologyCache != nil {
|
||||
if r.topologyCache.HasPopulatedHints(si.ServiceKey) {
|
||||
logger.Info("TopologyAwareHints annotation has changed, removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType)
|
||||
@ -275,8 +312,20 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.
|
||||
}
|
||||
r.topologyCache.RemoveHints(si.ServiceKey, addressType)
|
||||
}
|
||||
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
|
||||
|
||||
// Check 2.
|
||||
if !canUseTrafficDistribution {
|
||||
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
|
||||
}
|
||||
}
|
||||
|
||||
if canUseTrafficDistribution {
|
||||
r.metricsCache.UpdateTrafficDistributionForService(serviceNN, service.Spec.TrafficDistribution)
|
||||
slicesToCreate, slicesToUpdate, _ = trafficdist.ReconcileHints(service.Spec.TrafficDistribution, slicesToCreate, slicesToUpdate, unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete))
|
||||
} else {
|
||||
r.metricsCache.UpdateTrafficDistributionForService(serviceNN, nil)
|
||||
}
|
||||
|
||||
err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
@ -288,8 +337,8 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.
|
||||
|
||||
}
|
||||
|
||||
func NewReconciler(client clientset.Interface, nodeLister corelisters.NodeLister, maxEndpointsPerSlice int32, endpointSliceTracker *endpointsliceutil.EndpointSliceTracker, topologyCache *topologycache.TopologyCache, eventRecorder record.EventRecorder, controllerName string) *Reconciler {
|
||||
return &Reconciler{
|
||||
func NewReconciler(client clientset.Interface, nodeLister corelisters.NodeLister, maxEndpointsPerSlice int32, endpointSliceTracker *endpointsliceutil.EndpointSliceTracker, topologyCache *topologycache.TopologyCache, eventRecorder record.EventRecorder, controllerName string, options ...ReconcilerOption) *Reconciler {
|
||||
r := &Reconciler{
|
||||
client: client,
|
||||
nodeLister: nodeLister,
|
||||
maxEndpointsPerSlice: maxEndpointsPerSlice,
|
||||
@ -299,6 +348,10 @@ func NewReconciler(client clientset.Interface, nodeLister corelisters.NodeLister
|
||||
eventRecorder: eventRecorder,
|
||||
controllerName: controllerName,
|
||||
}
|
||||
for _, option := range options {
|
||||
option(r)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// placeholderSliceCompare is a conversion func for comparing two placeholder endpoint slices.
|
||||
@ -401,9 +454,15 @@ func (r *Reconciler) finalize(
|
||||
if r.topologyCache != nil && hintsEnabled(service.Annotations) {
|
||||
topologyLabel = "Auto"
|
||||
}
|
||||
var trafficDistribution string
|
||||
if r.trafficDistributionEnabled && !hintsEnabled(service.Annotations) {
|
||||
if service.Spec.TrafficDistribution != nil && *service.Spec.TrafficDistribution == corev1.ServiceTrafficDistributionPreferClose {
|
||||
trafficDistribution = *service.Spec.TrafficDistribution
|
||||
}
|
||||
}
|
||||
|
||||
numSlicesChanged := len(slicesToCreate) + len(slicesToUpdate) + len(slicesToDelete)
|
||||
metrics.EndpointSlicesChangedPerSync.WithLabelValues(topologyLabel).Observe(float64(numSlicesChanged))
|
||||
metrics.EndpointSlicesChangedPerSync.WithLabelValues(topologyLabel, trafficDistribution).Observe(float64(numSlicesChanged))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
@ -1972,6 +1973,215 @@ func TestReconcileTopology(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test reconciliation behaviour for trafficDistribution field.
|
||||
func TestReconcile_TrafficDistribution(t *testing.T) {
|
||||
// Setup the following topology for the test.
|
||||
//
|
||||
// - node-0 IN zone-a CONTAINS {pod-0}
|
||||
// - node-1 IN zone-b CONTAINS {pod-1, pod-2, pod-3}
|
||||
// - node-2 IN zone-c CONTAINS {pod-4, pod-5}
|
||||
ns := "ns1"
|
||||
svc, _ := newServiceAndEndpointMeta("foo", ns)
|
||||
nodes := []*corev1.Node{}
|
||||
pods := []*corev1.Pod{}
|
||||
for i := 0; i < 3; i++ {
|
||||
nodes = append(nodes, &corev1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("node-%v", i),
|
||||
Labels: map[string]string{
|
||||
corev1.LabelTopologyZone: fmt.Sprintf("zone-%c", 'a'+i),
|
||||
},
|
||||
},
|
||||
Status: corev1.NodeStatus{
|
||||
Conditions: []corev1.NodeCondition{{
|
||||
Type: corev1.NodeReady,
|
||||
Status: corev1.ConditionTrue,
|
||||
}},
|
||||
Allocatable: corev1.ResourceList{"cpu": resource.MustParse("100m")},
|
||||
},
|
||||
})
|
||||
}
|
||||
for i := 0; i < 6; i++ {
|
||||
pods = append(pods, newPod(i, ns, true, 1, false))
|
||||
}
|
||||
pods[0].Spec.NodeName = nodes[0].Name
|
||||
pods[1].Spec.NodeName = nodes[1].Name
|
||||
pods[2].Spec.NodeName = nodes[1].Name
|
||||
pods[3].Spec.NodeName = nodes[1].Name
|
||||
pods[4].Spec.NodeName = nodes[2].Name
|
||||
pods[5].Spec.NodeName = nodes[2].Name
|
||||
|
||||
// Define test cases.
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
desc string
|
||||
|
||||
trafficDistributionFeatureGateEnabled bool
|
||||
trafficDistribution string
|
||||
topologyAnnotation string
|
||||
|
||||
// Defines how many hints belong to a particular zone.
|
||||
wantHintsDistributionByZone map[string]int
|
||||
// Number of endpoints where the zone hints are different from the zone of
|
||||
// the endpoint itself.
|
||||
wantEndpointsWithCrossZoneHints int
|
||||
wantMetrics expectedMetrics
|
||||
}{
|
||||
{
|
||||
name: "trafficDistribution=PreferClose, topologyAnnotation=Disabled",
|
||||
desc: "When trafficDistribution is enabled and topologyAnnotation is disabled, hints should be distributed as per the trafficDistribution field",
|
||||
trafficDistributionFeatureGateEnabled: true,
|
||||
trafficDistribution: corev1.ServiceTrafficDistributionPreferClose,
|
||||
topologyAnnotation: "Disabled",
|
||||
wantHintsDistributionByZone: map[string]int{
|
||||
"zone-a": 1, // {pod-0}
|
||||
"zone-b": 3, // {pod-1, pod-2, pod-3}
|
||||
"zone-c": 2, // {pod-4, pod-5}
|
||||
},
|
||||
wantMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 6,
|
||||
addedPerSync: 6,
|
||||
removedPerSync: 0,
|
||||
numCreated: 1,
|
||||
numUpdated: 0,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 0, // 0 means either topologyAnnotation or trafficDistribution was used.
|
||||
slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used.
|
||||
slicesChangedPerSyncTrafficDist: 1, // 1 EPS configured using trafficDistribution.
|
||||
servicesCountByTrafficDistribution: map[string]int{
|
||||
"PreferClose": 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "feature gate disabled; trafficDistribution=PreferClose, topologyAnnotation=Disabled",
|
||||
desc: "When feature gate is disabled, trafficDistribution should be ignored",
|
||||
trafficDistributionFeatureGateEnabled: false,
|
||||
trafficDistribution: corev1.ServiceTrafficDistributionPreferClose,
|
||||
topologyAnnotation: "Disabled",
|
||||
wantHintsDistributionByZone: map[string]int{"": 6}, // Equivalent to no hints.
|
||||
wantMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 6,
|
||||
addedPerSync: 6,
|
||||
removedPerSync: 0,
|
||||
numCreated: 1,
|
||||
numUpdated: 0,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 1, // 1 means both topologyAnnotation and trafficDistribution were not used.
|
||||
slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used.
|
||||
slicesChangedPerSyncTrafficDist: 0, // 0 means trafficDistribution was not used.
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "trafficDistribution=PreferClose, topologyAnnotation=Auto",
|
||||
desc: "When trafficDistribution and topologyAnnotation are both enabled, precedence should be given to topologyAnnotation",
|
||||
trafficDistributionFeatureGateEnabled: true,
|
||||
trafficDistribution: corev1.ServiceTrafficDistributionPreferClose,
|
||||
topologyAnnotation: "Auto",
|
||||
wantHintsDistributionByZone: map[string]int{
|
||||
"zone-a": 2, // {pod-0, pod-3} (pod-3 is just an example, it could have also been either of the other two)
|
||||
"zone-b": 2, // {pod-1, pod-2}
|
||||
"zone-c": 2, // {pod-4, pod-5}
|
||||
},
|
||||
wantEndpointsWithCrossZoneHints: 1, // since a pod from zone-b is likely assigned a hint for zone-a
|
||||
wantMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 6,
|
||||
addedPerSync: 6,
|
||||
removedPerSync: 0,
|
||||
numCreated: 1,
|
||||
numUpdated: 0,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 0, // 0 means either topologyAnnotation or trafficDistribution was used.
|
||||
slicesChangedPerSyncTopology: 1, // 1 EPS configured using topologyAnnotation.
|
||||
slicesChangedPerSyncTrafficDist: 0, // 0 means trafficDistribution was not used.
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "trafficDistribution=<empty>, topologyAnnotation=<empty>",
|
||||
desc: "When trafficDistribution and topologyAnnotation are both disabled, no hints should be added, but the servicesCountByTrafficDistribution metric should reflect this",
|
||||
trafficDistributionFeatureGateEnabled: true,
|
||||
trafficDistribution: "",
|
||||
topologyAnnotation: "",
|
||||
wantHintsDistributionByZone: map[string]int{"": 6}, // Equivalent to no hints.
|
||||
wantMetrics: expectedMetrics{
|
||||
desiredSlices: 1,
|
||||
actualSlices: 1,
|
||||
desiredEndpoints: 6,
|
||||
addedPerSync: 6,
|
||||
removedPerSync: 0,
|
||||
numCreated: 1,
|
||||
numUpdated: 0,
|
||||
numDeleted: 0,
|
||||
slicesChangedPerSync: 1, // 1 means both topologyAnnotation and trafficDistribution were not used.
|
||||
slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used.
|
||||
slicesChangedPerSyncTrafficDist: 0, // 0 means trafficDistribution was not used.
|
||||
servicesCountByTrafficDistribution: map[string]int{
|
||||
"ImplementationSpecific": 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Make assertions.
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
client := newClientset()
|
||||
logger, _ := ktesting.NewTestContext(t)
|
||||
setupMetrics()
|
||||
|
||||
r := newReconciler(client, nodes, defaultMaxEndpointsPerSlice)
|
||||
r.trafficDistributionEnabled = tc.trafficDistributionFeatureGateEnabled
|
||||
r.topologyCache = topologycache.NewTopologyCache()
|
||||
r.topologyCache.SetNodes(logger, nodes)
|
||||
|
||||
service := svc.DeepCopy()
|
||||
service.Spec.TrafficDistribution = &tc.trafficDistribution
|
||||
service.Annotations = map[string]string{
|
||||
corev1.DeprecatedAnnotationTopologyAwareHints: tc.topologyAnnotation,
|
||||
}
|
||||
|
||||
err := r.Reconcile(logger, service, pods, nil, time.Now())
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Reconcile(...): return error = %v; want no error", err)
|
||||
}
|
||||
|
||||
fetchedSlices := fetchEndpointSlices(t, client, ns)
|
||||
gotHintsDistributionByZone := make(map[string]int)
|
||||
gotEndpointsWithCrossZoneHints := 0
|
||||
for _, slice := range fetchedSlices {
|
||||
for _, endpoint := range slice.Endpoints {
|
||||
var zoneHint string
|
||||
if endpoint.Hints != nil && len(endpoint.Hints.ForZones) == 1 {
|
||||
zoneHint = endpoint.Hints.ForZones[0].Name
|
||||
}
|
||||
gotHintsDistributionByZone[zoneHint]++
|
||||
if zoneHint != "" && *endpoint.Zone != zoneHint {
|
||||
gotEndpointsWithCrossZoneHints++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(tc.wantHintsDistributionByZone, gotHintsDistributionByZone); diff != "" {
|
||||
t.Errorf("Reconcile(...): Incorrect distribution of endpoints among zones; (-want, +got)\n%v", diff)
|
||||
}
|
||||
if gotEndpointsWithCrossZoneHints != tc.wantEndpointsWithCrossZoneHints {
|
||||
t.Errorf("Reconcile(...): EndpointSlices have endpoints with incorrect number of cross-zone hints; gotEndpointsWithCrossZoneHints=%v, want=%v", gotEndpointsWithCrossZoneHints, tc.wantEndpointsWithCrossZoneHints)
|
||||
}
|
||||
|
||||
expectMetrics(t, tc.wantMetrics)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Test Helpers
|
||||
|
||||
func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *Reconciler {
|
||||
@ -2112,18 +2322,20 @@ func reconcileHelper(t *testing.T, r *Reconciler, service *corev1.Service, pods
|
||||
// Metrics helpers
|
||||
|
||||
type expectedMetrics struct {
|
||||
desiredSlices int
|
||||
actualSlices int
|
||||
desiredEndpoints int
|
||||
addedPerSync int
|
||||
removedPerSync int
|
||||
numCreated int
|
||||
numUpdated int
|
||||
numDeleted int
|
||||
slicesChangedPerSync int
|
||||
slicesChangedPerSyncTopology int
|
||||
syncSuccesses int
|
||||
syncErrors int
|
||||
desiredSlices int
|
||||
actualSlices int
|
||||
desiredEndpoints int
|
||||
addedPerSync int
|
||||
removedPerSync int
|
||||
numCreated int
|
||||
numUpdated int
|
||||
numDeleted int
|
||||
slicesChangedPerSync int
|
||||
slicesChangedPerSyncTopology int
|
||||
slicesChangedPerSyncTrafficDist int
|
||||
syncSuccesses int
|
||||
syncErrors int
|
||||
servicesCountByTrafficDistribution map[string]int
|
||||
}
|
||||
|
||||
func expectMetrics(t *testing.T, em expectedMetrics) {
|
||||
@ -2177,18 +2389,24 @@ func expectMetrics(t *testing.T, em expectedMetrics) {
|
||||
t.Errorf("Expected endpointSliceChangesDeleted to be %d, got %v", em.numDeleted, actualDeleted)
|
||||
}
|
||||
|
||||
actualSlicesChangedPerSync, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Disabled"))
|
||||
actualSlicesChangedPerSync, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Disabled", ""))
|
||||
handleErr(t, err, "slicesChangedPerSync")
|
||||
if actualSlicesChangedPerSync != float64(em.slicesChangedPerSync) {
|
||||
t.Errorf("Expected slicesChangedPerSync to be %d, got %v", em.slicesChangedPerSync, actualSlicesChangedPerSync)
|
||||
}
|
||||
|
||||
actualSlicesChangedPerSyncTopology, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Auto"))
|
||||
actualSlicesChangedPerSyncTopology, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Auto", ""))
|
||||
handleErr(t, err, "slicesChangedPerSyncTopology")
|
||||
if actualSlicesChangedPerSyncTopology != float64(em.slicesChangedPerSyncTopology) {
|
||||
t.Errorf("Expected slicesChangedPerSyncTopology to be %d, got %v", em.slicesChangedPerSyncTopology, actualSlicesChangedPerSyncTopology)
|
||||
}
|
||||
|
||||
actualSlicesChangedPerSyncTrafficDist, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Disabled", "PreferClose"))
|
||||
handleErr(t, err, "slicesChangedPerSyncTrafficDist")
|
||||
if actualSlicesChangedPerSyncTrafficDist != float64(em.slicesChangedPerSyncTrafficDist) {
|
||||
t.Errorf("Expected slicesChangedPerSyncTrafficDist to be %d, got %v", em.slicesChangedPerSyncTrafficDist, actualSlicesChangedPerSyncTopology)
|
||||
}
|
||||
|
||||
actualSyncSuccesses, err := testutil.GetCounterMetricValue(metrics.EndpointSliceSyncs.WithLabelValues("success"))
|
||||
handleErr(t, err, "syncSuccesses")
|
||||
if actualSyncSuccesses != float64(em.syncSuccesses) {
|
||||
@ -2200,6 +2418,18 @@ func expectMetrics(t *testing.T, em expectedMetrics) {
|
||||
if actualSyncErrors != float64(em.syncErrors) {
|
||||
t.Errorf("Expected endpointSliceSyncErrors to be %d, got %v", em.syncErrors, actualSyncErrors)
|
||||
}
|
||||
|
||||
for _, trafficDistribution := range []string{"PreferClose", "ImplementationSpecific"} {
|
||||
gotServicesCount, err := testutil.GetGaugeMetricValue(metrics.ServicesCountByTrafficDistribution.WithLabelValues(trafficDistribution))
|
||||
var wantServicesCount int
|
||||
if em.servicesCountByTrafficDistribution != nil {
|
||||
wantServicesCount = em.servicesCountByTrafficDistribution[trafficDistribution]
|
||||
}
|
||||
handleErr(t, err, fmt.Sprintf("%v[traffic_distribution=%v]", "services_count_by_traffic_distribution", trafficDistribution))
|
||||
if int(gotServicesCount) != wantServicesCount {
|
||||
t.Errorf("Expected servicesCountByTrafficDistribution for traffic_distribution=%v to be %v, got %v", trafficDistribution, wantServicesCount, gotServicesCount)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handleErr(t *testing.T, err error, metricName string) {
|
||||
@ -2210,16 +2440,13 @@ func handleErr(t *testing.T, err error, metricName string) {
|
||||
|
||||
func setupMetrics() {
|
||||
metrics.RegisterMetrics()
|
||||
metrics.NumEndpointSlices.Delete(map[string]string{})
|
||||
metrics.DesiredEndpointSlices.Delete(map[string]string{})
|
||||
metrics.EndpointsDesired.Delete(map[string]string{})
|
||||
metrics.EndpointsAddedPerSync.Delete(map[string]string{})
|
||||
metrics.EndpointsRemovedPerSync.Delete(map[string]string{})
|
||||
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "create"})
|
||||
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "update"})
|
||||
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "delete"})
|
||||
metrics.EndpointSlicesChangedPerSync.Delete(map[string]string{"topology": "Disabled"})
|
||||
metrics.EndpointSlicesChangedPerSync.Delete(map[string]string{"topology": "Auto"})
|
||||
metrics.EndpointSliceSyncs.Delete(map[string]string{"result": "success"})
|
||||
metrics.EndpointSliceSyncs.Delete(map[string]string{"result": "error"})
|
||||
metrics.NumEndpointSlices.Reset()
|
||||
metrics.DesiredEndpointSlices.Reset()
|
||||
metrics.EndpointsDesired.Reset()
|
||||
metrics.EndpointsAddedPerSync.Reset()
|
||||
metrics.EndpointsRemovedPerSync.Reset()
|
||||
metrics.EndpointSliceChanges.Reset()
|
||||
metrics.EndpointSlicesChangedPerSync.Reset()
|
||||
metrics.EndpointSliceSyncs.Reset()
|
||||
metrics.ServicesCountByTrafficDistribution.Reset()
|
||||
}
|
||||
|
143
staging/src/k8s.io/endpointslice/trafficdist/trafficdist.go
Normal file
143
staging/src/k8s.io/endpointslice/trafficdist/trafficdist.go
Normal file
@ -0,0 +1,143 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// trafficdist handles reconciliation of hints for trafficDistribution field.
|
||||
package trafficdist
|
||||
|
||||
import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
)
|
||||
|
||||
// ReconcileHints will reconcile hints for the given EndpointSlices.
|
||||
//
|
||||
// EndpointSlice resources within slicesUnchanged will not be modified.
|
||||
func ReconcileHints(trafficDistribution *string, slicesToCreate, slicesToUpdate, slicesUnchanged []*discoveryv1.EndpointSlice) ([]*discoveryv1.EndpointSlice, []*discoveryv1.EndpointSlice, []*discoveryv1.EndpointSlice) {
|
||||
var h heuristic = &defaultHeuristic{}
|
||||
|
||||
if trafficDistribution != nil && *trafficDistribution == corev1.ServiceTrafficDistributionPreferClose {
|
||||
h = &preferCloseHeuristic{}
|
||||
}
|
||||
|
||||
// Identify the Unchanged slices that need an update because of missing or
|
||||
// incorrect zone hint.
|
||||
//
|
||||
// Uses filtering in place to remove any endpoints that are no longer
|
||||
// unchanged and need to be moved to slicesToUpdate
|
||||
// (https://github.com/golang/go/wiki/SliceTricks#filter-in-place)
|
||||
j := 0
|
||||
for _, slice := range slicesUnchanged {
|
||||
if h.needsUpdate(slice) {
|
||||
// Unchanged slices are direct copies from informer cache. We need to deep
|
||||
// copy an unchanged slice before making any modifications to it so that we do
|
||||
// not modify the slice within the informer cache.
|
||||
slicesToUpdate = append(slicesToUpdate, slice.DeepCopy())
|
||||
} else {
|
||||
slicesUnchanged[j] = slice
|
||||
j++
|
||||
}
|
||||
}
|
||||
// Truncate slicesUnchanged so it only includes slices that are still
|
||||
// unchanged.
|
||||
slicesUnchanged = slicesUnchanged[:j]
|
||||
|
||||
// Add zone hints to all slices that need to be created or updated.
|
||||
for _, slice := range slicesToCreate {
|
||||
h.update(slice)
|
||||
}
|
||||
for _, slice := range slicesToUpdate {
|
||||
h.update(slice)
|
||||
}
|
||||
|
||||
return slicesToCreate, slicesToUpdate, slicesUnchanged
|
||||
}
|
||||
|
||||
type heuristic interface {
|
||||
needsUpdate(*discoveryv1.EndpointSlice) bool
|
||||
update(*discoveryv1.EndpointSlice)
|
||||
}
|
||||
|
||||
// endpointReady returns true if an Endpoint has the Ready condition set to
|
||||
// true.
|
||||
func endpointReady(endpoint discoveryv1.Endpoint) bool {
|
||||
return endpoint.Conditions.Ready != nil && *endpoint.Conditions.Ready
|
||||
}
|
||||
|
||||
// defaultHeuristic means cluster wide routing, hence it will remove any hints
|
||||
// present in the EndpointSlice.
|
||||
type defaultHeuristic struct {
|
||||
}
|
||||
|
||||
// needsUpdate returns true if any endpoint in the slice has a zone hint.
|
||||
func (defaultHeuristic) needsUpdate(slice *discoveryv1.EndpointSlice) bool {
|
||||
if slice == nil {
|
||||
return false
|
||||
}
|
||||
for _, endpoint := range slice.Endpoints {
|
||||
if endpoint.Hints != nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// update removes zone hints from all endpoints.
|
||||
func (defaultHeuristic) update(slice *discoveryv1.EndpointSlice) {
|
||||
for i := range slice.Endpoints {
|
||||
slice.Endpoints[i].Hints = nil
|
||||
}
|
||||
}
|
||||
|
||||
// preferCloseHeuristic adds
|
||||
type preferCloseHeuristic struct {
|
||||
}
|
||||
|
||||
// needsUpdate returns true if any ready endpoint in the slice has a
|
||||
// missing or incorrect hint.
|
||||
func (preferCloseHeuristic) needsUpdate(slice *discoveryv1.EndpointSlice) bool {
|
||||
if slice == nil {
|
||||
return false
|
||||
}
|
||||
for _, endpoint := range slice.Endpoints {
|
||||
if !endpointReady(endpoint) {
|
||||
continue
|
||||
}
|
||||
var zone string
|
||||
if endpoint.Zone != nil {
|
||||
zone = *endpoint.Zone
|
||||
}
|
||||
|
||||
if endpoint.Hints == nil || len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// update adds a same zone topology hint for all ready endpoints
|
||||
func (preferCloseHeuristic) update(slice *discoveryv1.EndpointSlice) {
|
||||
for i, endpoint := range slice.Endpoints {
|
||||
if !endpointReady(endpoint) {
|
||||
continue
|
||||
}
|
||||
|
||||
var zone string
|
||||
if endpoint.Zone != nil {
|
||||
zone = *endpoint.Zone
|
||||
}
|
||||
slice.Endpoints[i].Hints = &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: zone}}}
|
||||
}
|
||||
}
|
470
staging/src/k8s.io/endpointslice/trafficdist/trafficdist_test.go
Normal file
470
staging/src/k8s.io/endpointslice/trafficdist/trafficdist_test.go
Normal file
@ -0,0 +1,470 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package trafficdist
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
func TestReconcileHints_trafficDistribution_is_PreferClose(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
|
||||
trafficDistribution *string
|
||||
slicesToCreate []*discoveryv1.EndpointSlice
|
||||
slicesToUpdate []*discoveryv1.EndpointSlice
|
||||
slicesUnchanged []*discoveryv1.EndpointSlice
|
||||
|
||||
wantSlicesToCreate []*discoveryv1.EndpointSlice
|
||||
wantSlicesToUpdate []*discoveryv1.EndpointSlice
|
||||
wantSlicesUnchanged []*discoveryv1.EndpointSlice
|
||||
}{
|
||||
{
|
||||
name: "should set same zone hints",
|
||||
trafficDistribution: ptrTo(corev1.ServiceTrafficDistributionPreferClose),
|
||||
slicesToCreate: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.1"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.0.2"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.3"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.0.4"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
slicesToUpdate: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.5"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.0.6"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.7"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.0.8"},
|
||||
Zone: ptr.To("zone-c"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSlicesToCreate: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.1"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-a"}}},
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.0.2"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-b"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.3"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-a"}}},
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.0.4"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-b"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSlicesToUpdate: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.5"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-a"}}},
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.0.6"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-a"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.7"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-b"}}},
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.0.8"},
|
||||
Zone: ptr.To("zone-c"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-c"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "incorrect hints should be corrected",
|
||||
trafficDistribution: ptrTo(corev1.ServiceTrafficDistributionPreferClose),
|
||||
slicesToUpdate: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.1"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-b"}}}, // incorrect hint as per new heuristic
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
slicesUnchanged: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.2"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-c"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.3"},
|
||||
Zone: ptr.To("zone-c"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSlicesToUpdate: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.1"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-a"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.2"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-b"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.3"},
|
||||
Zone: ptr.To("zone-c"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-c"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "unready endpoints should not trigger updates",
|
||||
trafficDistribution: ptrTo(corev1.ServiceTrafficDistributionPreferClose),
|
||||
slicesUnchanged: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.2"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(false)}, // endpoint is not ready
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSlicesUnchanged: []*discoveryv1.EndpointSlice{ // ... so there should be no updates
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.2"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(false)},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
gotSlicesToCreate, gotSlicesToUpdate, gotSlicesUnchanged := ReconcileHints(tc.trafficDistribution, tc.slicesToCreate, tc.slicesToUpdate, tc.slicesUnchanged)
|
||||
|
||||
if diff := cmp.Diff(tc.wantSlicesToCreate, gotSlicesToCreate, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("ReconcileHints(...) returned unexpected diff in 'slicesToCreate': (-want, +got)\n%v", diff)
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantSlicesToUpdate, gotSlicesToUpdate, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("ReconcileHints(...) returned unexpected diff in 'slicesToUpdate': (-want, +got)\n%v", diff)
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantSlicesUnchanged, gotSlicesUnchanged, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("ReconcileHints(...) returned unexpected diff in 'slicesUnchanged': (-want, +got)\n%v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReconcileHints_trafficDistribution_is_nil_or_empty(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
|
||||
trafficDistribution *string
|
||||
slicesToCreate []*discoveryv1.EndpointSlice
|
||||
slicesToUpdate []*discoveryv1.EndpointSlice
|
||||
slicesUnchanged []*discoveryv1.EndpointSlice
|
||||
|
||||
wantSlicesToCreate []*discoveryv1.EndpointSlice
|
||||
wantSlicesToUpdate []*discoveryv1.EndpointSlice
|
||||
wantSlicesUnchanged []*discoveryv1.EndpointSlice
|
||||
}{
|
||||
{
|
||||
name: "trafficDistribution='' should remove zone hints",
|
||||
trafficDistribution: ptrTo(""),
|
||||
slicesToCreate: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.1"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-a"}}},
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.0.2"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-b"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.3"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-a"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
slicesToUpdate: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.5"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-a"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSlicesToCreate: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.1"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
{
|
||||
Addresses: []string{"10.0.0.2"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.3"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSlicesToUpdate: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.5"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "trafficDistribution=nil should remove zone hints",
|
||||
trafficDistribution: nil,
|
||||
slicesToUpdate: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.5"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-a"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
slicesUnchanged: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.6"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
Hints: &discoveryv1.EndpointHints{ForZones: []discoveryv1.ForZone{{Name: "zone-b"}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
wantSlicesToUpdate: []*discoveryv1.EndpointSlice{
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.5"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.6"},
|
||||
Zone: ptr.To("zone-b"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
gotSlicesToCreate, gotSlicesToUpdate, gotSlicesUnchanged := ReconcileHints(tc.trafficDistribution, tc.slicesToCreate, tc.slicesToUpdate, tc.slicesUnchanged)
|
||||
|
||||
if diff := cmp.Diff(tc.wantSlicesToCreate, gotSlicesToCreate, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("ReconcileHints(...) returned unexpected diff in 'slicesToCreate': (-want, +got)\n%v", diff)
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantSlicesToUpdate, gotSlicesToUpdate, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("ReconcileHints(...) returned unexpected diff in 'slicesToUpdate': (-want, +got)\n%v", diff)
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantSlicesUnchanged, gotSlicesUnchanged, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("ReconcileHints(...) returned unexpected diff in 'slicesUnchanged': (-want, +got)\n%v", diff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that the EndpointSlice objects within `slicesUnchanged` don't get modified.
|
||||
func TestReconcileHints_doesNotMutateUnchangedSlices(t *testing.T) {
|
||||
originalEps := &discoveryv1.EndpointSlice{
|
||||
Endpoints: []discoveryv1.Endpoint{
|
||||
{
|
||||
Addresses: []string{"10.0.0.1"},
|
||||
Zone: ptr.To("zone-a"),
|
||||
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
|
||||
},
|
||||
},
|
||||
}
|
||||
clonedEps := originalEps.DeepCopy()
|
||||
|
||||
// originalEps should not get modified.
|
||||
ReconcileHints(ptrTo(corev1.ServiceTrafficDistributionPreferClose), nil, nil, []*discoveryv1.EndpointSlice{originalEps})
|
||||
if diff := cmp.Diff(clonedEps, originalEps); diff != "" {
|
||||
t.Errorf("ReconcileHints(...) modified objects within slicesUnchanged, want objects within slicesUnchanged to remain unmodified: (-want, +got)\n%v", diff)
|
||||
}
|
||||
}
|
||||
|
||||
func ptrTo[T any](obj T) *T {
|
||||
return &obj
|
||||
}
|
Loading…
Reference in New Issue
Block a user