Start reconciling on the new field

This commit is contained in:
Gaurav Ghildiyal 2024-02-23 19:26:17 -08:00
parent 9513f75089
commit 51a3fa2e6f
4 changed files with 293 additions and 38 deletions

View File

@ -173,6 +173,7 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
c.maxEndpointsPerSlice,
c.endpointSliceTracker,
c.topologyCache,
utilfeature.DefaultFeatureGate.Enabled(features.ServiceTrafficDistribution),
c.eventRecorder,
controllerName,
)

View File

@ -102,7 +102,10 @@ var (
Name: "endpointslices_changed_per_sync",
Help: "Number of EndpointSlices changed on each Service sync",
},
[]string{"topology"}, // either "Auto" or "Disabled"
[]string{
"topology", // either "Auto" or "Disabled"
"traffic_distribution", // "PreferClose" or <empty>
},
)
// EndpointSliceSyncs tracks the number of sync operations the controller

View File

@ -35,6 +35,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/endpointslice/metrics"
"k8s.io/endpointslice/topologycache"
"k8s.io/endpointslice/trafficdist"
endpointsliceutil "k8s.io/endpointslice/util"
"k8s.io/klog/v2"
)
@ -50,6 +51,9 @@ type Reconciler struct {
// topologyCache tracks the distribution of Nodes and endpoints across zones
// to enable TopologyAwareHints.
topologyCache *topologycache.TopologyCache
// trafficDistributionEnabled determines if endpointDistribution field is to
// be considered when reconciling EndpointSlice hints.
trafficDistributionEnabled bool
// eventRecorder allows Reconciler to record and publish events.
eventRecorder record.EventRecorder
controllerName string
@ -261,9 +265,32 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.
Unchanged: unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete),
}
canUseTrafficDistribution := r.trafficDistributionEnabled && !hintsEnabled(service.Annotations)
// Check if we need to add/remove hints based on the topology annotation.
//
// This if/else clause can be removed once the annotation has been deprecated.
// Ref: https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/4444-service-routing-preference
if r.topologyCache != nil && hintsEnabled(service.Annotations) {
// Reaching this point means that we need to configure hints based on the
// topology annotation.
slicesToCreate, slicesToUpdate, events = r.topologyCache.AddHints(logger, si)
} else {
// Reaching this point means that we will not be configuring hints based on
// the topology annotation. We need to do 2 things:
// 1. If hints were added previously based on the annotation, we need to
// clear up any locally cached hints from the topologyCache object.
// 2. Optionally remove the actual hints from the EndpointSlice if we know
// that the `trafficDistribution` field is also NOT being used. In other
// words, if we know that the `trafficDistribution` field has been
// correctly configured by the customer, we DO NOT remove the hints and
// wait for the trafficDist handlers to correctly configure them. Always
// unconditionally removing hints here (and letting them get readded by
// the trafficDist) adds extra overhead in the form of DeepCopy (done
// within topologyCache.RemoveHints)
// Check 1.
if r.topologyCache != nil {
if r.topologyCache.HasPopulatedHints(si.ServiceKey) {
logger.Info("TopologyAwareHints annotation has changed, removing hints", "serviceKey", si.ServiceKey, "addressType", si.AddressType)
@ -275,8 +302,17 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.
}
r.topologyCache.RemoveHints(si.ServiceKey, addressType)
}
// Check 2.
if !canUseTrafficDistribution {
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
}
}
if canUseTrafficDistribution {
slicesToCreate, slicesToUpdate, _ = trafficdist.ReconcileHints(service.Spec.TrafficDistribution, slicesToCreate, slicesToUpdate, unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete))
}
err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
if err != nil {
errs = append(errs, err)
@ -288,7 +324,7 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.
}
func NewReconciler(client clientset.Interface, nodeLister corelisters.NodeLister, maxEndpointsPerSlice int32, endpointSliceTracker *endpointsliceutil.EndpointSliceTracker, topologyCache *topologycache.TopologyCache, eventRecorder record.EventRecorder, controllerName string) *Reconciler {
func NewReconciler(client clientset.Interface, nodeLister corelisters.NodeLister, maxEndpointsPerSlice int32, endpointSliceTracker *endpointsliceutil.EndpointSliceTracker, topologyCache *topologycache.TopologyCache, trafficDistributionEnabled bool, eventRecorder record.EventRecorder, controllerName string) *Reconciler {
return &Reconciler{
client: client,
nodeLister: nodeLister,
@ -296,6 +332,7 @@ func NewReconciler(client clientset.Interface, nodeLister corelisters.NodeLister
endpointSliceTracker: endpointSliceTracker,
metricsCache: metrics.NewCache(maxEndpointsPerSlice),
topologyCache: topologyCache,
trafficDistributionEnabled: trafficDistributionEnabled,
eventRecorder: eventRecorder,
controllerName: controllerName,
}
@ -401,9 +438,15 @@ func (r *Reconciler) finalize(
if r.topologyCache != nil && hintsEnabled(service.Annotations) {
topologyLabel = "Auto"
}
var trafficDistribution string
if r.trafficDistributionEnabled && !hintsEnabled(service.Annotations) {
if service.Spec.TrafficDistribution != nil && *service.Spec.TrafficDistribution == corev1.ServiceTrafficDistributionPreferClose {
trafficDistribution = *service.Spec.TrafficDistribution
}
}
numSlicesChanged := len(slicesToCreate) + len(slicesToUpdate) + len(slicesToDelete)
metrics.EndpointSlicesChangedPerSync.WithLabelValues(topologyLabel).Observe(float64(numSlicesChanged))
metrics.EndpointSlicesChangedPerSync.WithLabelValues(topologyLabel, trafficDistribution).Observe(float64(numSlicesChanged))
return nil
}

View File

@ -24,6 +24,7 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
@ -1972,6 +1973,209 @@ func TestReconcileTopology(t *testing.T) {
}
}
// Test reconciliation behaviour for trafficDistribution field.
func TestReconcile_TrafficDistribution(t *testing.T) {
// Setup the following topology for the test.
//
// - node-0 IN zone-a CONTAINS {pod-0}
// - node-1 IN zone-b CONTAINS {pod-1, pod-2, pod-3}
// - node-2 IN zone-c CONTAINS {pod-4, pod-5}
ns := "ns1"
svc, _ := newServiceAndEndpointMeta("foo", ns)
nodes := []*corev1.Node{}
pods := []*corev1.Pod{}
for i := 0; i < 3; i++ {
nodes = append(nodes, &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("node-%v", i),
Labels: map[string]string{
corev1.LabelTopologyZone: fmt.Sprintf("zone-%c", 'a'+i),
},
},
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
}},
Allocatable: corev1.ResourceList{"cpu": resource.MustParse("100m")},
},
})
}
for i := 0; i < 6; i++ {
pods = append(pods, newPod(i, ns, true, 1, false))
}
pods[0].Spec.NodeName = nodes[0].Name
pods[1].Spec.NodeName = nodes[1].Name
pods[2].Spec.NodeName = nodes[1].Name
pods[3].Spec.NodeName = nodes[1].Name
pods[4].Spec.NodeName = nodes[2].Name
pods[5].Spec.NodeName = nodes[2].Name
// Define test cases.
testCases := []struct {
name string
desc string
trafficDistributionFeatureGateEnabled bool
trafficDistribution string
topologyAnnotation string
// Defines how many hints belong to a particular zone.
wantHintsDistributionByZone map[string]int
// Number of endpoints where the zone hints are different from the zone of
// the endpoint itself.
wantEndpointsWithCrossZoneHints int
wantMetrics expectedMetrics
}{
{
name: "trafficDistribution=PreferClose, topologyAnnotation=Disabled",
desc: "When trafficDistribution is enabled and topologyAnnotation is disabled, hints should be distributed as per the trafficDistribution field",
trafficDistributionFeatureGateEnabled: true,
trafficDistribution: corev1.ServiceTrafficDistributionPreferClose,
topologyAnnotation: "Disabled",
wantHintsDistributionByZone: map[string]int{
"zone-a": 1, // {pod-0}
"zone-b": 3, // {pod-1, pod-2, pod-3}
"zone-c": 2, // {pod-4, pod-5}
},
wantMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 6,
addedPerSync: 6,
removedPerSync: 0,
numCreated: 1,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 0, // 0 means either topologyAnnotation or trafficDistribution was used.
slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used.
slicesChangedPerSyncTrafficDist: 1, // 1 EPS configured using trafficDistribution.
},
},
{
name: "feature gate disabled; trafficDistribution=PreferClose, topologyAnnotation=Disabled",
desc: "When feature gate is disabled, trafficDistribution should be ignored",
trafficDistributionFeatureGateEnabled: false,
trafficDistribution: corev1.ServiceTrafficDistributionPreferClose,
topologyAnnotation: "Disabled",
wantHintsDistributionByZone: map[string]int{"": 6}, // Equivalent to no hints.
wantMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 6,
addedPerSync: 6,
removedPerSync: 0,
numCreated: 1,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 1, // 1 means both topologyAnnotation and trafficDistribution were not used.
slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used.
slicesChangedPerSyncTrafficDist: 0, // 0 means trafficDistribution was not used.
},
},
{
name: "trafficDistribution=PreferClose, topologyAnnotation=Auto",
desc: "When trafficDistribution and topologyAnnotation are both enabled, precedence should be given to topologyAnnotation",
trafficDistributionFeatureGateEnabled: true,
trafficDistribution: corev1.ServiceTrafficDistributionPreferClose,
topologyAnnotation: "Auto",
wantHintsDistributionByZone: map[string]int{
"zone-a": 2, // {pod-0, pod-3} (pod-3 is just an example, it could have also been either of the other two)
"zone-b": 2, // {pod-1, pod-2}
"zone-c": 2, // {pod-4, pod-5}
},
wantEndpointsWithCrossZoneHints: 1, // since a pod from zone-b is likely assigned a hint for zone-a
wantMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 6,
addedPerSync: 6,
removedPerSync: 0,
numCreated: 1,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 0, // 0 means either topologyAnnotation or trafficDistribution was used.
slicesChangedPerSyncTopology: 1, // 1 EPS configured using topologyAnnotation.
slicesChangedPerSyncTrafficDist: 0, // 0 means trafficDistribution was not used.
},
},
{
name: "trafficDistribution=<empty>, topologyAnnotation=<empty>",
desc: "When trafficDistribution and topologyAnnotation are both disabled, no hints should be added",
trafficDistributionFeatureGateEnabled: true,
trafficDistribution: "",
topologyAnnotation: "",
wantHintsDistributionByZone: map[string]int{"": 6}, // Equivalent to no hints.
wantMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 6,
addedPerSync: 6,
removedPerSync: 0,
numCreated: 1,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 1, // 1 means both topologyAnnotation and trafficDistribution were not used.
slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used.
slicesChangedPerSyncTrafficDist: 0, // 0 means trafficDistribution was not used.
},
},
}
// Make assertions.
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
client := newClientset()
logger, _ := ktesting.NewTestContext(t)
setupMetrics()
r := newReconciler(client, nodes, defaultMaxEndpointsPerSlice)
r.trafficDistributionEnabled = tc.trafficDistributionFeatureGateEnabled
r.topologyCache = topologycache.NewTopologyCache()
r.topologyCache.SetNodes(logger, nodes)
service := svc.DeepCopy()
service.Spec.TrafficDistribution = &tc.trafficDistribution
service.Annotations = map[string]string{
corev1.DeprecatedAnnotationTopologyAwareHints: tc.topologyAnnotation,
}
err := r.Reconcile(logger, service, pods, nil, time.Now())
if err != nil {
t.Errorf("Reconcile(...): return error = %v; want no error", err)
}
fetchedSlices := fetchEndpointSlices(t, client, ns)
gotHintsDistributionByZone := make(map[string]int)
gotEndpointsWithCrossZoneHints := 0
for _, slice := range fetchedSlices {
for _, endpoint := range slice.Endpoints {
var zoneHint string
if endpoint.Hints != nil && len(endpoint.Hints.ForZones) == 1 {
zoneHint = endpoint.Hints.ForZones[0].Name
}
gotHintsDistributionByZone[zoneHint]++
if zoneHint != "" && *endpoint.Zone != zoneHint {
gotEndpointsWithCrossZoneHints++
}
}
}
if diff := cmp.Diff(tc.wantHintsDistributionByZone, gotHintsDistributionByZone); diff != "" {
t.Errorf("Reconcile(...): Incorrect distribution of endpoints among zones; (-want, +got)\n%v", diff)
}
if gotEndpointsWithCrossZoneHints != tc.wantEndpointsWithCrossZoneHints {
t.Errorf("Reconcile(...): EndpointSlices have endpoints with incorrect number of cross-zone hints; gotEndpointsWithCrossZoneHints=%v, want=%v", gotEndpointsWithCrossZoneHints, tc.wantEndpointsWithCrossZoneHints)
}
expectMetrics(t, tc.wantMetrics)
})
}
}
// Test Helpers
func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *Reconciler {
@ -1989,6 +2193,7 @@ func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPer
maxEndpointsPerSlice,
endpointsliceutil.NewEndpointSliceTracker(),
nil,
false,
eventRecorder,
controllerName,
)
@ -2122,6 +2327,7 @@ type expectedMetrics struct {
numDeleted int
slicesChangedPerSync int
slicesChangedPerSyncTopology int
slicesChangedPerSyncTrafficDist int
syncSuccesses int
syncErrors int
}
@ -2177,18 +2383,24 @@ func expectMetrics(t *testing.T, em expectedMetrics) {
t.Errorf("Expected endpointSliceChangesDeleted to be %d, got %v", em.numDeleted, actualDeleted)
}
actualSlicesChangedPerSync, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Disabled"))
actualSlicesChangedPerSync, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Disabled", ""))
handleErr(t, err, "slicesChangedPerSync")
if actualSlicesChangedPerSync != float64(em.slicesChangedPerSync) {
t.Errorf("Expected slicesChangedPerSync to be %d, got %v", em.slicesChangedPerSync, actualSlicesChangedPerSync)
}
actualSlicesChangedPerSyncTopology, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Auto"))
actualSlicesChangedPerSyncTopology, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Auto", ""))
handleErr(t, err, "slicesChangedPerSyncTopology")
if actualSlicesChangedPerSyncTopology != float64(em.slicesChangedPerSyncTopology) {
t.Errorf("Expected slicesChangedPerSyncTopology to be %d, got %v", em.slicesChangedPerSyncTopology, actualSlicesChangedPerSyncTopology)
}
actualSlicesChangedPerSyncTrafficDist, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("Disabled", "PreferClose"))
handleErr(t, err, "slicesChangedPerSyncTrafficDist")
if actualSlicesChangedPerSyncTrafficDist != float64(em.slicesChangedPerSyncTrafficDist) {
t.Errorf("Expected slicesChangedPerSyncTrafficDist to be %d, got %v", em.slicesChangedPerSyncTrafficDist, actualSlicesChangedPerSyncTopology)
}
actualSyncSuccesses, err := testutil.GetCounterMetricValue(metrics.EndpointSliceSyncs.WithLabelValues("success"))
handleErr(t, err, "syncSuccesses")
if actualSyncSuccesses != float64(em.syncSuccesses) {
@ -2210,16 +2422,12 @@ func handleErr(t *testing.T, err error, metricName string) {
func setupMetrics() {
metrics.RegisterMetrics()
metrics.NumEndpointSlices.Delete(map[string]string{})
metrics.DesiredEndpointSlices.Delete(map[string]string{})
metrics.EndpointsDesired.Delete(map[string]string{})
metrics.EndpointsAddedPerSync.Delete(map[string]string{})
metrics.EndpointsRemovedPerSync.Delete(map[string]string{})
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "create"})
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "update"})
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "delete"})
metrics.EndpointSlicesChangedPerSync.Delete(map[string]string{"topology": "Disabled"})
metrics.EndpointSlicesChangedPerSync.Delete(map[string]string{"topology": "Auto"})
metrics.EndpointSliceSyncs.Delete(map[string]string{"result": "success"})
metrics.EndpointSliceSyncs.Delete(map[string]string{"result": "error"})
metrics.NumEndpointSlices.Reset()
metrics.DesiredEndpointSlices.Reset()
metrics.EndpointsDesired.Reset()
metrics.EndpointsAddedPerSync.Reset()
metrics.EndpointsRemovedPerSync.Reset()
metrics.EndpointSliceChanges.Reset()
metrics.EndpointSlicesChangedPerSync.Reset()
metrics.EndpointSliceSyncs.Reset()
}