diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index 01bcd0baa0e..687e08b9fc2 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -173,6 +173,7 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer, c.maxEndpointsPerSlice, c.endpointSliceTracker, c.topologyCache, + utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution), c.eventRecorder, controllerName, ) diff --git a/staging/src/k8s.io/endpointslice/metrics/metrics.go b/staging/src/k8s.io/endpointslice/metrics/metrics.go index 6a166703be0..977142fa067 100644 --- a/staging/src/k8s.io/endpointslice/metrics/metrics.go +++ b/staging/src/k8s.io/endpointslice/metrics/metrics.go @@ -102,7 +102,10 @@ var ( Name: "endpointslices_changed_per_sync", Help: "Number of EndpointSlices changed on each Service sync", }, - []string{"topology"}, // either "Auto" or "Disabled" + []string{ + "topology", // either "Auto" or "Disabled" + "traffic_distribution", // "PreferClose" or + }, ) // EndpointSliceSyncs tracks the number of sync operations the controller diff --git a/staging/src/k8s.io/endpointslice/reconciler.go b/staging/src/k8s.io/endpointslice/reconciler.go index d1f59af8ce3..417666e098f 100644 --- a/staging/src/k8s.io/endpointslice/reconciler.go +++ b/staging/src/k8s.io/endpointslice/reconciler.go @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/endpointslice/metrics" "k8s.io/endpointslice/topologycache" + "k8s.io/endpointslice/trafficdist" endpointsliceutil "k8s.io/endpointslice/util" "k8s.io/klog/v2" ) @@ -50,6 +51,9 @@ type Reconciler struct { // topologyCache tracks the distribution of Nodes and endpoints across zones // to enable TopologyAwareHints. topologyCache *topologycache.TopologyCache + // trafficDistributionEnabled determines if endpointDistribution field is to + // be considered when reconciling EndpointSlice hints. + trafficDistributionEnabled bool // eventRecorder allows Reconciler to record and publish events. eventRecorder record.EventRecorder controllerName string @@ -261,9 +265,32 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1. Unchanged: unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete), } + canUseTrafficDistribution := r.trafficDistributionEnabled && !hintsEnabled(service.Annotations) + + // Check if we need to add/remove hints based on the topology annotation. + // + // This if/else clause can be removed once the annotation has been deprecated. + // Ref: https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/4444-service-routing-preference if r.topologyCache != nil && hintsEnabled(service.Annotations) { + // Reaching this point means that we need to configure hints based on the + // topology annotation. slicesToCreate, slicesToUpdate, events = r.topologyCache.AddHints(logger, si) + } else { + // Reaching this point means that we will not be configuring hints based on + // the topology annotation. We need to do 2 things: + // 1. If hints were added previously based on the annotation, we need to + // clear up any locally cached hints from the topologyCache object. + // 2. Optionally remove the actual hints from the EndpointSlice if we know + // that the `trafficDistribution` field is also NOT being used. In other + // words, if we know that the `trafficDistribution` field has been + // correctly configured by the customer, we DO NOT remove the hints and + // wait for the trafficDist handlers to correctly configure them. Always + // unconditionally removing hints here (and letting them get readded by + // the trafficDist) adds extra overhead in the form of DeepCopy (done + // within topologyCache.RemoveHints) + + // Check 1. if r.topologyCache != nil { if r.topologyCache.HasPopulatedHints(si.ServiceKey) { logger.Info("TopologyAwareHints annotation has changed, removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType) @@ -275,8 +302,17 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1. } r.topologyCache.RemoveHints(si.ServiceKey, addressType) } - slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si) + + // Check 2. + if !canUseTrafficDistribution { + slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si) + } } + + if canUseTrafficDistribution { + slicesToCreate, slicesToUpdate, _ = trafficdist.ReconcileHints(service.Spec.TrafficDistribution, slicesToCreate, slicesToUpdate, unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete)) + } + err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime) if err != nil { errs = append(errs, err) @@ -288,16 +324,17 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1. } -func NewReconciler(client clientset.Interface, nodeLister corelisters.NodeLister, maxEndpointsPerSlice int32, endpointSliceTracker *endpointsliceutil.EndpointSliceTracker, topologyCache *topologycache.TopologyCache, eventRecorder record.EventRecorder, controllerName string) *Reconciler { +func NewReconciler(client clientset.Interface, nodeLister corelisters.NodeLister, maxEndpointsPerSlice int32, endpointSliceTracker *endpointsliceutil.EndpointSliceTracker, topologyCache *topologycache.TopologyCache, trafficDistributionEnabled bool, eventRecorder record.EventRecorder, controllerName string) *Reconciler { return &Reconciler{ - client: client, - nodeLister: nodeLister, - maxEndpointsPerSlice: maxEndpointsPerSlice, - endpointSliceTracker: endpointSliceTracker, - metricsCache: metrics.NewCache(maxEndpointsPerSlice), - topologyCache: topologyCache, - eventRecorder: eventRecorder, - controllerName: controllerName, + client: client, + nodeLister: nodeLister, + maxEndpointsPerSlice: maxEndpointsPerSlice, + endpointSliceTracker: endpointSliceTracker, + metricsCache: metrics.NewCache(maxEndpointsPerSlice), + topologyCache: topologyCache, + trafficDistributionEnabled: trafficDistributionEnabled, + eventRecorder: eventRecorder, + controllerName: controllerName, } } @@ -401,9 +438,15 @@ func (r *Reconciler) finalize( if r.topologyCache != nil && hintsEnabled(service.Annotations) { topologyLabel = "Auto" } + var trafficDistribution string + if r.trafficDistributionEnabled && !hintsEnabled(service.Annotations) { + if service.Spec.TrafficDistribution != nil && *service.Spec.TrafficDistribution == corev1.ServiceTrafficDistributionPreferClose { + trafficDistribution = *service.Spec.TrafficDistribution + } + } numSlicesChanged := len(slicesToCreate) + len(slicesToUpdate) + len(slicesToDelete) - metrics.EndpointSlicesChangedPerSync.WithLabelValues(topologyLabel).Observe(float64(numSlicesChanged)) + metrics.EndpointSlicesChangedPerSync.WithLabelValues(topologyLabel, trafficDistribution).Observe(float64(numSlicesChanged)) return nil } diff --git a/staging/src/k8s.io/endpointslice/reconciler_test.go b/staging/src/k8s.io/endpointslice/reconciler_test.go index 03d4f92e981..cd781a42429 100644 --- a/staging/src/k8s.io/endpointslice/reconciler_test.go +++ b/staging/src/k8s.io/endpointslice/reconciler_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" @@ -1972,6 +1973,209 @@ func TestReconcileTopology(t *testing.T) { } } +// Test reconciliation behaviour for trafficDistribution field. +func TestReconcile_TrafficDistribution(t *testing.T) { + // Setup the following topology for the test. + // + // - node-0 IN zone-a CONTAINS {pod-0} + // - node-1 IN zone-b CONTAINS {pod-1, pod-2, pod-3} + // - node-2 IN zone-c CONTAINS {pod-4, pod-5} + ns := "ns1" + svc, _ := newServiceAndEndpointMeta("foo", ns) + nodes := []*corev1.Node{} + pods := []*corev1.Pod{} + for i := 0; i < 3; i++ { + nodes = append(nodes, &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("node-%v", i), + Labels: map[string]string{ + corev1.LabelTopologyZone: fmt.Sprintf("zone-%c", 'a'+i), + }, + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{{ + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }}, + Allocatable: corev1.ResourceList{"cpu": resource.MustParse("100m")}, + }, + }) + } + for i := 0; i < 6; i++ { + pods = append(pods, newPod(i, ns, true, 1, false)) + } + pods[0].Spec.NodeName = nodes[0].Name + pods[1].Spec.NodeName = nodes[1].Name + pods[2].Spec.NodeName = nodes[1].Name + pods[3].Spec.NodeName = nodes[1].Name + pods[4].Spec.NodeName = nodes[2].Name + pods[5].Spec.NodeName = nodes[2].Name + + // Define test cases. + + testCases := []struct { + name string + desc string + + trafficDistributionFeatureGateEnabled bool + trafficDistribution string + topologyAnnotation string + + // Defines how many hints belong to a particular zone. + wantHintsDistributionByZone map[string]int + // Number of endpoints where the zone hints are different from the zone of + // the endpoint itself. + wantEndpointsWithCrossZoneHints int + wantMetrics expectedMetrics + }{ + { + name: "trafficDistribution=PreferClose, topologyAnnotation=Disabled", + desc: "When trafficDistribution is enabled and topologyAnnotation is disabled, hints should be distributed as per the trafficDistribution field", + trafficDistributionFeatureGateEnabled: true, + trafficDistribution: corev1.ServiceTrafficDistributionPreferClose, + topologyAnnotation: "Disabled", + wantHintsDistributionByZone: map[string]int{ + "zone-a": 1, // {pod-0} + "zone-b": 3, // {pod-1, pod-2, pod-3} + "zone-c": 2, // {pod-4, pod-5} + }, + wantMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 6, + addedPerSync: 6, + removedPerSync: 0, + numCreated: 1, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 0, // 0 means either topologyAnnotation or trafficDistribution was used. + slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used. + slicesChangedPerSyncTrafficDist: 1, // 1 EPS configured using trafficDistribution. + }, + }, + { + name: "feature gate disabled; trafficDistribution=PreferClose, topologyAnnotation=Disabled", + desc: "When feature gate is disabled, trafficDistribution should be ignored", + trafficDistributionFeatureGateEnabled: false, + trafficDistribution: corev1.ServiceTrafficDistributionPreferClose, + topologyAnnotation: "Disabled", + wantHintsDistributionByZone: map[string]int{"": 6}, // Equivalent to no hints. + wantMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 6, + addedPerSync: 6, + removedPerSync: 0, + numCreated: 1, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 1, // 1 means both topologyAnnotation and trafficDistribution were not used. + slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used. + slicesChangedPerSyncTrafficDist: 0, // 0 means trafficDistribution was not used. + }, + }, + { + name: "trafficDistribution=PreferClose, topologyAnnotation=Auto", + desc: "When trafficDistribution and topologyAnnotation are both enabled, precedence should be given to topologyAnnotation", + trafficDistributionFeatureGateEnabled: true, + trafficDistribution: corev1.ServiceTrafficDistributionPreferClose, + topologyAnnotation: "Auto", + wantHintsDistributionByZone: map[string]int{ + "zone-a": 2, // {pod-0, pod-3} (pod-3 is just an example, it could have also been either of the other two) + "zone-b": 2, // {pod-1, pod-2} + "zone-c": 2, // {pod-4, pod-5} + }, + wantEndpointsWithCrossZoneHints: 1, // since a pod from zone-b is likely assigned a hint for zone-a + wantMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 6, + addedPerSync: 6, + removedPerSync: 0, + numCreated: 1, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 0, // 0 means either topologyAnnotation or trafficDistribution was used. + slicesChangedPerSyncTopology: 1, // 1 EPS configured using topologyAnnotation. + slicesChangedPerSyncTrafficDist: 0, // 0 means trafficDistribution was not used. + }, + }, + { + name: "trafficDistribution=, topologyAnnotation=", + desc: "When trafficDistribution and topologyAnnotation are both disabled, no hints should be added", + trafficDistributionFeatureGateEnabled: true, + trafficDistribution: "", + topologyAnnotation: "", + wantHintsDistributionByZone: map[string]int{"": 6}, // Equivalent to no hints. + wantMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 6, + addedPerSync: 6, + removedPerSync: 0, + numCreated: 1, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 1, // 1 means both topologyAnnotation and trafficDistribution were not used. + slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used. + slicesChangedPerSyncTrafficDist: 0, // 0 means trafficDistribution was not used. + }, + }, + } + + // Make assertions. + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + client := newClientset() + logger, _ := ktesting.NewTestContext(t) + setupMetrics() + + r := newReconciler(client, nodes, defaultMaxEndpointsPerSlice) + r.trafficDistributionEnabled = tc.trafficDistributionFeatureGateEnabled + r.topologyCache = topologycache.NewTopologyCache() + r.topologyCache.SetNodes(logger, nodes) + + service := svc.DeepCopy() + service.Spec.TrafficDistribution = &tc.trafficDistribution + service.Annotations = map[string]string{ + corev1.DeprecatedAnnotationTopologyAwareHints: tc.topologyAnnotation, + } + + err := r.Reconcile(logger, service, pods, nil, time.Now()) + + if err != nil { + t.Errorf("Reconcile(...): return error = %v; want no error", err) + } + + fetchedSlices := fetchEndpointSlices(t, client, ns) + gotHintsDistributionByZone := make(map[string]int) + gotEndpointsWithCrossZoneHints := 0 + for _, slice := range fetchedSlices { + for _, endpoint := range slice.Endpoints { + var zoneHint string + if endpoint.Hints != nil && len(endpoint.Hints.ForZones) == 1 { + zoneHint = endpoint.Hints.ForZones[0].Name + } + gotHintsDistributionByZone[zoneHint]++ + if zoneHint != "" && *endpoint.Zone != zoneHint { + gotEndpointsWithCrossZoneHints++ + } + } + } + + if diff := cmp.Diff(tc.wantHintsDistributionByZone, gotHintsDistributionByZone); diff != "" { + t.Errorf("Reconcile(...): Incorrect distribution of endpoints among zones; (-want, +got)\n%v", diff) + } + if gotEndpointsWithCrossZoneHints != tc.wantEndpointsWithCrossZoneHints { + t.Errorf("Reconcile(...): EndpointSlices have endpoints with incorrect number of cross-zone hints; gotEndpointsWithCrossZoneHints=%v, want=%v", gotEndpointsWithCrossZoneHints, tc.wantEndpointsWithCrossZoneHints) + } + + expectMetrics(t, tc.wantMetrics) + }) + } +} + // Test Helpers func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *Reconciler { @@ -1989,6 +2193,7 @@ func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPer maxEndpointsPerSlice, endpointsliceutil.NewEndpointSliceTracker(), nil, + false, eventRecorder, controllerName, ) @@ -2112,18 +2317,19 @@ func reconcileHelper(t *testing.T, r *Reconciler, service *corev1.Service, pods // Metrics helpers type expectedMetrics struct { - desiredSlices int - actualSlices int - desiredEndpoints int - addedPerSync int - removedPerSync int - numCreated int - numUpdated int - numDeleted int - slicesChangedPerSync int - slicesChangedPerSyncTopology int - syncSuccesses int - syncErrors int + desiredSlices int + actualSlices int + desiredEndpoints int + addedPerSync int + removedPerSync int + numCreated int + numUpdated int + numDeleted int + slicesChangedPerSync int + slicesChangedPerSyncTopology int + slicesChangedPerSyncTrafficDist int + syncSuccesses int + syncErrors int } func expectMetrics(t *testing.T, em expectedMetrics) { @@ -2177,18 +2383,24 @@ func expectMetrics(t *testing.T, em expectedMetrics) { t.Errorf("Expected endpointSliceChangesDeleted to be %d, got %v", em.numDeleted, actualDeleted) } - actualSlicesChangedPerSync, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Disabled")) + actualSlicesChangedPerSync, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Disabled", "")) handleErr(t, err, "slicesChangedPerSync") if actualSlicesChangedPerSync != float64(em.slicesChangedPerSync) { t.Errorf("Expected slicesChangedPerSync to be %d, got %v", em.slicesChangedPerSync, actualSlicesChangedPerSync) } - actualSlicesChangedPerSyncTopology, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Auto")) + actualSlicesChangedPerSyncTopology, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Auto", "")) handleErr(t, err, "slicesChangedPerSyncTopology") if actualSlicesChangedPerSyncTopology != float64(em.slicesChangedPerSyncTopology) { t.Errorf("Expected slicesChangedPerSyncTopology to be %d, got %v", em.slicesChangedPerSyncTopology, actualSlicesChangedPerSyncTopology) } + actualSlicesChangedPerSyncTrafficDist, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Disabled", "PreferClose")) + handleErr(t, err, "slicesChangedPerSyncTrafficDist") + if actualSlicesChangedPerSyncTrafficDist != float64(em.slicesChangedPerSyncTrafficDist) { + t.Errorf("Expected slicesChangedPerSyncTrafficDist to be %d, got %v", em.slicesChangedPerSyncTrafficDist, actualSlicesChangedPerSyncTopology) + } + actualSyncSuccesses, err := testutil.GetCounterMetricValue(metrics.EndpointSliceSyncs.WithLabelValues("success")) handleErr(t, err, "syncSuccesses") if actualSyncSuccesses != float64(em.syncSuccesses) { @@ -2210,16 +2422,12 @@ func handleErr(t *testing.T, err error, metricName string) { func setupMetrics() { metrics.RegisterMetrics() - metrics.NumEndpointSlices.Delete(map[string]string{}) - metrics.DesiredEndpointSlices.Delete(map[string]string{}) - metrics.EndpointsDesired.Delete(map[string]string{}) - metrics.EndpointsAddedPerSync.Delete(map[string]string{}) - metrics.EndpointsRemovedPerSync.Delete(map[string]string{}) - metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "create"}) - metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "update"}) - metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "delete"}) - metrics.EndpointSlicesChangedPerSync.Delete(map[string]string{"topology": "Disabled"}) - metrics.EndpointSlicesChangedPerSync.Delete(map[string]string{"topology": "Auto"}) - metrics.EndpointSliceSyncs.Delete(map[string]string{"result": "success"}) - metrics.EndpointSliceSyncs.Delete(map[string]string{"result": "error"}) + metrics.NumEndpointSlices.Reset() + metrics.DesiredEndpointSlices.Reset() + metrics.EndpointsDesired.Reset() + metrics.EndpointsAddedPerSync.Reset() + metrics.EndpointsRemovedPerSync.Reset() + metrics.EndpointSliceChanges.Reset() + metrics.EndpointSlicesChangedPerSync.Reset() + metrics.EndpointSliceSyncs.Reset() }