From 606cae9b4790cd611f4b321f7e65fd9ce4f476f3 Mon Sep 17 00:00:00 2001 From: Gaurav Ghildiyal Date: Wed, 28 Feb 2024 22:01:48 -0800 Subject: [PATCH] Add new metric servicesCountByTrafficDistribution --- .../src/k8s.io/endpointslice/metrics/cache.go | 58 ++++++++++- .../endpointslice/metrics/cache_test.go | 96 +++++++++++++++++++ .../k8s.io/endpointslice/metrics/metrics.go | 13 +++ .../src/k8s.io/endpointslice/reconciler.go | 3 + .../k8s.io/endpointslice/reconciler_test.go | 48 +++++++--- 5 files changed, 200 insertions(+), 18 deletions(-) diff --git a/staging/src/k8s.io/endpointslice/metrics/cache.go b/staging/src/k8s.io/endpointslice/metrics/cache.go index e2681f262fe..6f102bb498e 100644 --- a/staging/src/k8s.io/endpointslice/metrics/cache.go +++ b/staging/src/k8s.io/endpointslice/metrics/cache.go @@ -20,6 +20,7 @@ import ( "math" "sync" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" endpointsliceutil "k8s.io/endpointslice/util" ) @@ -27,8 +28,9 @@ import ( // NewCache returns a new Cache with the specified endpointsPerSlice. func NewCache(endpointsPerSlice int32) *Cache { return &Cache{ - maxEndpointsPerSlice: endpointsPerSlice, - cache: map[types.NamespacedName]*ServicePortCache{}, + maxEndpointsPerSlice: endpointsPerSlice, + cache: map[types.NamespacedName]*ServicePortCache{}, + servicesByTrafficDistribution: make(map[string]map[types.NamespacedName]bool), } } @@ -40,7 +42,7 @@ type Cache struct { maxEndpointsPerSlice int32 // lock protects changes to numEndpoints, numSlicesActual, numSlicesDesired, - // and cache. + // cache and servicesByTrafficDistribution lock sync.Mutex // numEndpoints represents the total number of endpoints stored in // EndpointSlices. @@ -52,8 +54,18 @@ type Cache struct { // cache stores a ServicePortCache grouped by NamespacedNames representing // Services. cache map[types.NamespacedName]*ServicePortCache + // Tracks all services partitioned by their trafficDistribution field. + // + // The type should be read as map[trafficDistribution]setOfServices + servicesByTrafficDistribution map[string]map[types.NamespacedName]bool } +const ( + // Label value for cases when service.spec.trafficDistribution is set to an + // unknown value. + trafficDistributionImplementationSpecific = "ImplementationSpecific" +) + // ServicePortCache tracks values for total numbers of desired endpoints as well // as the efficiency of EndpointSlice endpoints distribution for each unique // Service Port combination. @@ -124,12 +136,46 @@ func (c *Cache) UpdateServicePortCache(serviceNN types.NamespacedName, spCache * c.updateMetrics() } +func (c *Cache) UpdateTrafficDistributionForService(serviceNN types.NamespacedName, trafficDistributionPtr *string) { + c.lock.Lock() + defer c.lock.Unlock() + + defer c.updateMetrics() + + for _, serviceSet := range c.servicesByTrafficDistribution { + delete(serviceSet, serviceNN) + } + + if trafficDistributionPtr == nil { + return + } + + trafficDistribution := *trafficDistributionPtr + // If we don't explicitly recognize a value for trafficDistribution, it should + // be treated as an implementation specific value. All such implementation + // specific values should use the label value "ImplementationSpecific" to not + // explode the metric labels cardinality. + if trafficDistribution != corev1.ServiceTrafficDistributionPreferClose { + trafficDistribution = trafficDistributionImplementationSpecific + } + serviceSet, ok := c.servicesByTrafficDistribution[trafficDistribution] + if !ok { + serviceSet = make(map[types.NamespacedName]bool) + c.servicesByTrafficDistribution[trafficDistribution] = serviceSet + } + serviceSet[serviceNN] = true +} + // DeleteService removes references of a Service from the global cache and // updates the corresponding metrics. func (c *Cache) DeleteService(serviceNN types.NamespacedName) { c.lock.Lock() defer c.lock.Unlock() + for _, serviceSet := range c.servicesByTrafficDistribution { + delete(serviceSet, serviceNN) + } + if spCache, ok := c.cache[serviceNN]; ok { actualSlices, desiredSlices, endpoints := spCache.totals(int(c.maxEndpointsPerSlice)) c.numEndpoints = c.numEndpoints - endpoints @@ -137,7 +183,6 @@ func (c *Cache) DeleteService(serviceNN types.NamespacedName) { c.numSlicesActual -= actualSlices c.updateMetrics() delete(c.cache, serviceNN) - } } @@ -147,6 +192,11 @@ func (c *Cache) updateMetrics() { NumEndpointSlices.WithLabelValues().Set(float64(c.numSlicesActual)) DesiredEndpointSlices.WithLabelValues().Set(float64(c.numSlicesDesired)) EndpointsDesired.WithLabelValues().Set(float64(c.numEndpoints)) + + ServicesCountByTrafficDistribution.Reset() + for trafficDistribution, services := range c.servicesByTrafficDistribution { + ServicesCountByTrafficDistribution.WithLabelValues(trafficDistribution).Set(float64(len(services))) + } } // numDesiredSlices calculates the number of EndpointSlices that would exist diff --git a/staging/src/k8s.io/endpointslice/metrics/cache_test.go b/staging/src/k8s.io/endpointslice/metrics/cache_test.go index bbf400852f2..adf23591d6c 100644 --- a/staging/src/k8s.io/endpointslice/metrics/cache_test.go +++ b/staging/src/k8s.io/endpointslice/metrics/cache_test.go @@ -20,6 +20,8 @@ import ( "fmt" "testing" + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" endpointsliceutil "k8s.io/endpointslice/util" @@ -89,6 +91,96 @@ func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int } } +// Tests the mutations to servicesByTrafficDistribution field within Cache +// object. +func TestCache_ServicesByTrafficDistribution(t *testing.T) { + cache := NewCache(0) + + service1 := types.NamespacedName{Namespace: "ns1", Name: "service1"} + service2 := types.NamespacedName{Namespace: "ns1", Name: "service2"} + service3 := types.NamespacedName{Namespace: "ns2", Name: "service3"} + service4 := types.NamespacedName{Namespace: "ns3", Name: "service4"} + + // Define helper function for assertion + mustHaveServicesByTrafficDistribution := func(wantServicesByTrafficDistribution map[string]map[types.NamespacedName]bool, desc string) { + t.Helper() + gotServicesByTrafficDistribution := cache.servicesByTrafficDistribution + if diff := cmp.Diff(wantServicesByTrafficDistribution, gotServicesByTrafficDistribution); diff != "" { + t.Fatalf("UpdateTrafficDistributionForService(%v) resulted in unexpected diff for cache.servicesByTrafficDistribution; (-want, +got)\n%v", desc, diff) + } + } + + // Mutate and make assertions + + desc := "service1 starts using trafficDistribution=PreferClose" + cache.UpdateTrafficDistributionForService(service1, ptrTo(corev1.ServiceTrafficDistributionPreferClose)) + mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ + corev1.ServiceTrafficDistributionPreferClose: {service1: true}, + }, desc) + + desc = "service1 starts using trafficDistribution=PreferClose, retries of similar mutation should be idempotent" + cache.UpdateTrafficDistributionForService(service1, ptrTo(corev1.ServiceTrafficDistributionPreferClose)) + mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ // No delta + corev1.ServiceTrafficDistributionPreferClose: {service1: true}, + }, desc) + + desc = "service2 starts using trafficDistribution=PreferClose" + cache.UpdateTrafficDistributionForService(service2, ptrTo(corev1.ServiceTrafficDistributionPreferClose)) + mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ + corev1.ServiceTrafficDistributionPreferClose: {service1: true, service2: true}, // Delta + }, desc) + + desc = "service3 starts using trafficDistribution=InvalidValue" + cache.UpdateTrafficDistributionForService(service3, ptrTo("InvalidValue")) + mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ + corev1.ServiceTrafficDistributionPreferClose: {service1: true, service2: true}, + trafficDistributionImplementationSpecific: {service3: true}, // Delta + }, desc) + + desc = "service4 starts using trafficDistribution=nil" + cache.UpdateTrafficDistributionForService(service4, nil) + mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ // No delta + corev1.ServiceTrafficDistributionPreferClose: {service1: true, service2: true}, + trafficDistributionImplementationSpecific: {service3: true}, + }, desc) + + desc = "service2 transitions trafficDistribution: PreferClose -> InvalidValue" + cache.UpdateTrafficDistributionForService(service2, ptrTo("InvalidValue")) + mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ + corev1.ServiceTrafficDistributionPreferClose: {service1: true}, // Delta + trafficDistributionImplementationSpecific: {service3: true, service2: true}, // Delta + }, desc) + + desc = "service3 gets deleted" + cache.DeleteService(service3) + mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ + corev1.ServiceTrafficDistributionPreferClose: {service1: true}, + trafficDistributionImplementationSpecific: {service2: true}, // Delta + }, desc) + + desc = "service1 transitions trafficDistribution: PreferClose -> empty" + cache.UpdateTrafficDistributionForService(service1, ptrTo("")) + mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ + corev1.ServiceTrafficDistributionPreferClose: {}, // Delta + trafficDistributionImplementationSpecific: {service1: true, service2: true}, // Delta + }, desc) + + desc = "service1 transitions trafficDistribution: InvalidValue -> nil" + cache.UpdateTrafficDistributionForService(service1, nil) + mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ + corev1.ServiceTrafficDistributionPreferClose: {}, + trafficDistributionImplementationSpecific: {service2: true}, // Delta + }, desc) + + desc = "service2 transitions trafficDistribution: InvalidValue -> nil" + cache.UpdateTrafficDistributionForService(service2, nil) + mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ + corev1.ServiceTrafficDistributionPreferClose: {}, + trafficDistributionImplementationSpecific: {}, // Delta + }, desc) + +} + func benchmarkUpdateServicePortCache(b *testing.B, num int) { c := NewCache(int32(100)) ns := "benchmark" @@ -132,3 +224,7 @@ func BenchmarkUpdateServicePortCache10000(b *testing.B) { func BenchmarkUpdateServicePortCache100000(b *testing.B) { benchmarkUpdateServicePortCache(b, 100000) } + +func ptrTo[T any](obj T) *T { + return &obj +} diff --git a/staging/src/k8s.io/endpointslice/metrics/metrics.go b/staging/src/k8s.io/endpointslice/metrics/metrics.go index 977142fa067..236a83c3218 100644 --- a/staging/src/k8s.io/endpointslice/metrics/metrics.go +++ b/staging/src/k8s.io/endpointslice/metrics/metrics.go @@ -119,6 +119,18 @@ var ( }, []string{"result"}, // either "success", "stale", or "error" ) + + // ServicesCountByTrafficDistribution tracks the number of Services using some + // specific trafficDistribution + ServicesCountByTrafficDistribution = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: EndpointSliceSubsystem, + Name: "services_count_by_traffic_distribution", + Help: "Number of Services using some specific trafficDistribution", + StabilityLevel: metrics.ALPHA, + }, + []string{"traffic_distribution"}, // One of ["PreferClose", "ImplementationSpecific"] + ) ) var registerMetrics sync.Once @@ -134,5 +146,6 @@ func RegisterMetrics() { legacyregistry.MustRegister(EndpointSliceChanges) legacyregistry.MustRegister(EndpointSlicesChangedPerSync) legacyregistry.MustRegister(EndpointSliceSyncs) + legacyregistry.MustRegister(ServicesCountByTrafficDistribution) }) } diff --git a/staging/src/k8s.io/endpointslice/reconciler.go b/staging/src/k8s.io/endpointslice/reconciler.go index 417666e098f..1ec466c503f 100644 --- a/staging/src/k8s.io/endpointslice/reconciler.go +++ b/staging/src/k8s.io/endpointslice/reconciler.go @@ -310,7 +310,10 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1. } if canUseTrafficDistribution { + r.metricsCache.UpdateTrafficDistributionForService(serviceNN, service.Spec.TrafficDistribution) slicesToCreate, slicesToUpdate, _ = trafficdist.ReconcileHints(service.Spec.TrafficDistribution, slicesToCreate, slicesToUpdate, unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete)) + } else { + r.metricsCache.UpdateTrafficDistributionForService(serviceNN, nil) } err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime) diff --git a/staging/src/k8s.io/endpointslice/reconciler_test.go b/staging/src/k8s.io/endpointslice/reconciler_test.go index cd781a42429..007b3a9293e 100644 --- a/staging/src/k8s.io/endpointslice/reconciler_test.go +++ b/staging/src/k8s.io/endpointslice/reconciler_test.go @@ -2051,6 +2051,9 @@ func TestReconcile_TrafficDistribution(t *testing.T) { slicesChangedPerSync: 0, // 0 means either topologyAnnotation or trafficDistribution was used. slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used. slicesChangedPerSyncTrafficDist: 1, // 1 EPS configured using trafficDistribution. + servicesCountByTrafficDistribution: map[string]int{ + "PreferClose": 1, + }, }, }, { @@ -2102,7 +2105,7 @@ func TestReconcile_TrafficDistribution(t *testing.T) { }, { name: "trafficDistribution=, topologyAnnotation=", - desc: "When trafficDistribution and topologyAnnotation are both disabled, no hints should be added", + desc: "When trafficDistribution and topologyAnnotation are both disabled, no hints should be added, but the servicesCountByTrafficDistribution metric should reflect this", trafficDistributionFeatureGateEnabled: true, trafficDistribution: "", topologyAnnotation: "", @@ -2119,6 +2122,9 @@ func TestReconcile_TrafficDistribution(t *testing.T) { slicesChangedPerSync: 1, // 1 means both topologyAnnotation and trafficDistribution were not used. slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used. slicesChangedPerSyncTrafficDist: 0, // 0 means trafficDistribution was not used. + servicesCountByTrafficDistribution: map[string]int{ + "ImplementationSpecific": 1, + }, }, }, } @@ -2317,19 +2323,20 @@ func reconcileHelper(t *testing.T, r *Reconciler, service *corev1.Service, pods // Metrics helpers type expectedMetrics struct { - desiredSlices int - actualSlices int - desiredEndpoints int - addedPerSync int - removedPerSync int - numCreated int - numUpdated int - numDeleted int - slicesChangedPerSync int - slicesChangedPerSyncTopology int - slicesChangedPerSyncTrafficDist int - syncSuccesses int - syncErrors int + desiredSlices int + actualSlices int + desiredEndpoints int + addedPerSync int + removedPerSync int + numCreated int + numUpdated int + numDeleted int + slicesChangedPerSync int + slicesChangedPerSyncTopology int + slicesChangedPerSyncTrafficDist int + syncSuccesses int + syncErrors int + servicesCountByTrafficDistribution map[string]int } func expectMetrics(t *testing.T, em expectedMetrics) { @@ -2412,6 +2419,18 @@ func expectMetrics(t *testing.T, em expectedMetrics) { if actualSyncErrors != float64(em.syncErrors) { t.Errorf("Expected endpointSliceSyncErrors to be %d, got %v", em.syncErrors, actualSyncErrors) } + + for _, trafficDistribution := range []string{"PreferClose", "ImplementationSpecific"} { + gotServicesCount, err := testutil.GetGaugeMetricValue(metrics.ServicesCountByTrafficDistribution.WithLabelValues(trafficDistribution)) + var wantServicesCount int + if em.servicesCountByTrafficDistribution != nil { + wantServicesCount = em.servicesCountByTrafficDistribution[trafficDistribution] + } + handleErr(t, err, fmt.Sprintf("%v[traffic_distribution=%v]", "services_count_by_traffic_distribution", trafficDistribution)) + if int(gotServicesCount) != wantServicesCount { + t.Errorf("Expected servicesCountByTrafficDistribution for traffic_distribution=%v to be %v, got %v", trafficDistribution, wantServicesCount, gotServicesCount) + } + } } func handleErr(t *testing.T, err error, metricName string) { @@ -2430,4 +2449,5 @@ func setupMetrics() { metrics.EndpointSliceChanges.Reset() metrics.EndpointSlicesChangedPerSync.Reset() metrics.EndpointSliceSyncs.Reset() + metrics.ServicesCountByTrafficDistribution.Reset() }