Add new metric servicesCountByTrafficDistribution

This commit is contained in:
Gaurav Ghildiyal 2024-02-28 22:01:48 -08:00
parent 51f86b9124
commit 606cae9b47
5 changed files with 200 additions and 18 deletions

View File

@ -20,6 +20,7 @@ import (
"math"
"sync"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
endpointsliceutil "k8s.io/endpointslice/util"
)
@ -29,6 +30,7 @@ func NewCache(endpointsPerSlice int32) *Cache {
return &Cache{
maxEndpointsPerSlice: endpointsPerSlice,
cache: map[types.NamespacedName]*ServicePortCache{},
servicesByTrafficDistribution: make(map[string]map[types.NamespacedName]bool),
}
}
@ -40,7 +42,7 @@ type Cache struct {
maxEndpointsPerSlice int32
// lock protects changes to numEndpoints, numSlicesActual, numSlicesDesired,
// and cache.
// cache and servicesByTrafficDistribution
lock sync.Mutex
// numEndpoints represents the total number of endpoints stored in
// EndpointSlices.
@ -52,8 +54,18 @@ type Cache struct {
// cache stores a ServicePortCache grouped by NamespacedNames representing
// Services.
cache map[types.NamespacedName]*ServicePortCache
// Tracks all services partitioned by their trafficDistribution field.
//
// The type should be read as map[trafficDistribution]setOfServices
servicesByTrafficDistribution map[string]map[types.NamespacedName]bool
}
const (
// Label value for cases when service.spec.trafficDistribution is set to an
// unknown value.
trafficDistributionImplementationSpecific = "ImplementationSpecific"
)
// ServicePortCache tracks values for total numbers of desired endpoints as well
// as the efficiency of EndpointSlice endpoints distribution for each unique
// Service Port combination.
@ -124,12 +136,46 @@ func (c *Cache) UpdateServicePortCache(serviceNN types.NamespacedName, spCache *
c.updateMetrics()
}
func (c *Cache) UpdateTrafficDistributionForService(serviceNN types.NamespacedName, trafficDistributionPtr *string) {
c.lock.Lock()
defer c.lock.Unlock()
defer c.updateMetrics()
for _, serviceSet := range c.servicesByTrafficDistribution {
delete(serviceSet, serviceNN)
}
if trafficDistributionPtr == nil {
return
}
trafficDistribution := *trafficDistributionPtr
// If we don't explicitly recognize a value for trafficDistribution, it should
// be treated as an implementation specific value. All such implementation
// specific values should use the label value "ImplementationSpecific" to not
// explode the metric labels cardinality.
if trafficDistribution != corev1.ServiceTrafficDistributionPreferClose {
trafficDistribution = trafficDistributionImplementationSpecific
}
serviceSet, ok := c.servicesByTrafficDistribution[trafficDistribution]
if !ok {
serviceSet = make(map[types.NamespacedName]bool)
c.servicesByTrafficDistribution[trafficDistribution] = serviceSet
}
serviceSet[serviceNN] = true
}
// DeleteService removes references of a Service from the global cache and
// updates the corresponding metrics.
func (c *Cache) DeleteService(serviceNN types.NamespacedName) {
c.lock.Lock()
defer c.lock.Unlock()
for _, serviceSet := range c.servicesByTrafficDistribution {
delete(serviceSet, serviceNN)
}
if spCache, ok := c.cache[serviceNN]; ok {
actualSlices, desiredSlices, endpoints := spCache.totals(int(c.maxEndpointsPerSlice))
c.numEndpoints = c.numEndpoints - endpoints
@ -137,7 +183,6 @@ func (c *Cache) DeleteService(serviceNN types.NamespacedName) {
c.numSlicesActual -= actualSlices
c.updateMetrics()
delete(c.cache, serviceNN)
}
}
@ -147,6 +192,11 @@ func (c *Cache) updateMetrics() {
NumEndpointSlices.WithLabelValues().Set(float64(c.numSlicesActual))
DesiredEndpointSlices.WithLabelValues().Set(float64(c.numSlicesDesired))
EndpointsDesired.WithLabelValues().Set(float64(c.numEndpoints))
ServicesCountByTrafficDistribution.Reset()
for trafficDistribution, services := range c.servicesByTrafficDistribution {
ServicesCountByTrafficDistribution.WithLabelValues(trafficDistribution).Set(float64(len(services)))
}
}
// numDesiredSlices calculates the number of EndpointSlices that would exist

View File

@ -20,6 +20,8 @@ import (
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
endpointsliceutil "k8s.io/endpointslice/util"
@ -89,6 +91,96 @@ func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int
}
}
// Tests the mutations to servicesByTrafficDistribution field within Cache
// object.
func TestCache_ServicesByTrafficDistribution(t *testing.T) {
cache := NewCache(0)
service1 := types.NamespacedName{Namespace: "ns1", Name: "service1"}
service2 := types.NamespacedName{Namespace: "ns1", Name: "service2"}
service3 := types.NamespacedName{Namespace: "ns2", Name: "service3"}
service4 := types.NamespacedName{Namespace: "ns3", Name: "service4"}
// Define helper function for assertion
mustHaveServicesByTrafficDistribution := func(wantServicesByTrafficDistribution map[string]map[types.NamespacedName]bool, desc string) {
t.Helper()
gotServicesByTrafficDistribution := cache.servicesByTrafficDistribution
if diff := cmp.Diff(wantServicesByTrafficDistribution, gotServicesByTrafficDistribution); diff != "" {
t.Fatalf("UpdateTrafficDistributionForService(%v) resulted in unexpected diff for cache.servicesByTrafficDistribution; (-want, +got)\n%v", desc, diff)
}
}
// Mutate and make assertions
desc := "service1 starts using trafficDistribution=PreferClose"
cache.UpdateTrafficDistributionForService(service1, ptrTo(corev1.ServiceTrafficDistributionPreferClose))
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {service1: true},
}, desc)
desc = "service1 starts using trafficDistribution=PreferClose, retries of similar mutation should be idempotent"
cache.UpdateTrafficDistributionForService(service1, ptrTo(corev1.ServiceTrafficDistributionPreferClose))
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ // No delta
corev1.ServiceTrafficDistributionPreferClose: {service1: true},
}, desc)
desc = "service2 starts using trafficDistribution=PreferClose"
cache.UpdateTrafficDistributionForService(service2, ptrTo(corev1.ServiceTrafficDistributionPreferClose))
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {service1: true, service2: true}, // Delta
}, desc)
desc = "service3 starts using trafficDistribution=InvalidValue"
cache.UpdateTrafficDistributionForService(service3, ptrTo("InvalidValue"))
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {service1: true, service2: true},
trafficDistributionImplementationSpecific: {service3: true}, // Delta
}, desc)
desc = "service4 starts using trafficDistribution=nil"
cache.UpdateTrafficDistributionForService(service4, nil)
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{ // No delta
corev1.ServiceTrafficDistributionPreferClose: {service1: true, service2: true},
trafficDistributionImplementationSpecific: {service3: true},
}, desc)
desc = "service2 transitions trafficDistribution: PreferClose -> InvalidValue"
cache.UpdateTrafficDistributionForService(service2, ptrTo("InvalidValue"))
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {service1: true}, // Delta
trafficDistributionImplementationSpecific: {service3: true, service2: true}, // Delta
}, desc)
desc = "service3 gets deleted"
cache.DeleteService(service3)
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {service1: true},
trafficDistributionImplementationSpecific: {service2: true}, // Delta
}, desc)
desc = "service1 transitions trafficDistribution: PreferClose -> empty"
cache.UpdateTrafficDistributionForService(service1, ptrTo(""))
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {}, // Delta
trafficDistributionImplementationSpecific: {service1: true, service2: true}, // Delta
}, desc)
desc = "service1 transitions trafficDistribution: InvalidValue -> nil"
cache.UpdateTrafficDistributionForService(service1, nil)
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {},
trafficDistributionImplementationSpecific: {service2: true}, // Delta
}, desc)
desc = "service2 transitions trafficDistribution: InvalidValue -> nil"
cache.UpdateTrafficDistributionForService(service2, nil)
mustHaveServicesByTrafficDistribution(map[string]map[types.NamespacedName]bool{
corev1.ServiceTrafficDistributionPreferClose: {},
trafficDistributionImplementationSpecific: {}, // Delta
}, desc)
}
func benchmarkUpdateServicePortCache(b *testing.B, num int) {
c := NewCache(int32(100))
ns := "benchmark"
@ -132,3 +224,7 @@ func BenchmarkUpdateServicePortCache10000(b *testing.B) {
func BenchmarkUpdateServicePortCache100000(b *testing.B) {
benchmarkUpdateServicePortCache(b, 100000)
}
func ptrTo[T any](obj T) *T {
return &obj
}

View File

@ -119,6 +119,18 @@ var (
},
[]string{"result"}, // either "success", "stale", or "error"
)
// ServicesCountByTrafficDistribution tracks the number of Services using some
// specific trafficDistribution
ServicesCountByTrafficDistribution = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: EndpointSliceSubsystem,
Name: "services_count_by_traffic_distribution",
Help: "Number of Services using some specific trafficDistribution",
StabilityLevel: metrics.ALPHA,
},
[]string{"traffic_distribution"}, // One of ["PreferClose", "ImplementationSpecific"]
)
)
var registerMetrics sync.Once
@ -134,5 +146,6 @@ func RegisterMetrics() {
legacyregistry.MustRegister(EndpointSliceChanges)
legacyregistry.MustRegister(EndpointSlicesChangedPerSync)
legacyregistry.MustRegister(EndpointSliceSyncs)
legacyregistry.MustRegister(ServicesCountByTrafficDistribution)
})
}

View File

@ -310,7 +310,10 @@ func (r *Reconciler) reconcileByAddressType(logger klog.Logger, service *corev1.
}
if canUseTrafficDistribution {
r.metricsCache.UpdateTrafficDistributionForService(serviceNN, service.Spec.TrafficDistribution)
slicesToCreate, slicesToUpdate, _ = trafficdist.ReconcileHints(service.Spec.TrafficDistribution, slicesToCreate, slicesToUpdate, unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete))
} else {
r.metricsCache.UpdateTrafficDistributionForService(serviceNN, nil)
}
err := r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)

View File

@ -2051,6 +2051,9 @@ func TestReconcile_TrafficDistribution(t *testing.T) {
slicesChangedPerSync: 0, // 0 means either topologyAnnotation or trafficDistribution was used.
slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used.
slicesChangedPerSyncTrafficDist: 1, // 1 EPS configured using trafficDistribution.
servicesCountByTrafficDistribution: map[string]int{
"PreferClose": 1,
},
},
},
{
@ -2102,7 +2105,7 @@ func TestReconcile_TrafficDistribution(t *testing.T) {
},
{
name: "trafficDistribution=<empty>, topologyAnnotation=<empty>",
desc: "When trafficDistribution and topologyAnnotation are both disabled, no hints should be added",
desc: "When trafficDistribution and topologyAnnotation are both disabled, no hints should be added, but the servicesCountByTrafficDistribution metric should reflect this",
trafficDistributionFeatureGateEnabled: true,
trafficDistribution: "",
topologyAnnotation: "",
@ -2119,6 +2122,9 @@ func TestReconcile_TrafficDistribution(t *testing.T) {
slicesChangedPerSync: 1, // 1 means both topologyAnnotation and trafficDistribution were not used.
slicesChangedPerSyncTopology: 0, // 0 means topologyAnnotation was not used.
slicesChangedPerSyncTrafficDist: 0, // 0 means trafficDistribution was not used.
servicesCountByTrafficDistribution: map[string]int{
"ImplementationSpecific": 1,
},
},
},
}
@ -2330,6 +2336,7 @@ type expectedMetrics struct {
slicesChangedPerSyncTrafficDist int
syncSuccesses int
syncErrors int
servicesCountByTrafficDistribution map[string]int
}
func expectMetrics(t *testing.T, em expectedMetrics) {
@ -2412,6 +2419,18 @@ func expectMetrics(t *testing.T, em expectedMetrics) {
if actualSyncErrors != float64(em.syncErrors) {
t.Errorf("Expected endpointSliceSyncErrors to be %d, got %v", em.syncErrors, actualSyncErrors)
}
for _, trafficDistribution := range []string{"PreferClose", "ImplementationSpecific"} {
gotServicesCount, err := testutil.GetGaugeMetricValue(metrics.ServicesCountByTrafficDistribution.WithLabelValues(trafficDistribution))
var wantServicesCount int
if em.servicesCountByTrafficDistribution != nil {
wantServicesCount = em.servicesCountByTrafficDistribution[trafficDistribution]
}
handleErr(t, err, fmt.Sprintf("%v[traffic_distribution=%v]", "services_count_by_traffic_distribution", trafficDistribution))
if int(gotServicesCount) != wantServicesCount {
t.Errorf("Expected servicesCountByTrafficDistribution for traffic_distribution=%v to be %v, got %v", trafficDistribution, wantServicesCount, gotServicesCount)
}
}
}
func handleErr(t *testing.T, err error, metricName string) {
@ -2430,4 +2449,5 @@ func setupMetrics() {
metrics.EndpointSliceChanges.Reset()
metrics.EndpointSlicesChangedPerSync.Reset()
metrics.EndpointSliceSyncs.Reset()
metrics.ServicesCountByTrafficDistribution.Reset()
}