diff --git a/pkg/controller/endpointslice/metrics/cache.go b/pkg/controller/endpointslice/metrics/cache.go index 516002eae61..86a67f8a54b 100644 --- a/pkg/controller/endpointslice/metrics/cache.go +++ b/pkg/controller/endpointslice/metrics/cache.go @@ -39,11 +39,16 @@ type Cache struct { // should be added to an EndpointSlice. maxEndpointsPerSlice int32 - // lock protects changes to numEndpoints and cache. + // lock protects changes to numEndpoints, numSlicesActual, numSlicesDesired, + // and cache. lock sync.Mutex // numEndpoints represents the total number of endpoints stored in // EndpointSlices. numEndpoints int + // numSlicesActual represents the total number of EndpointSlices. + numSlicesActual int + // numSlicesDesired represents the desired number of EndpointSlices. + numSlicesDesired int // cache stores a ServicePortCache grouped by NamespacedNames representing // Services. cache map[types.NamespacedName]*ServicePortCache @@ -77,14 +82,16 @@ func (spc *ServicePortCache) Set(pmKey endpointutil.PortMapKey, eInfo Efficiency spc.items[pmKey] = eInfo } -// numEndpoints returns the total number of endpoints represented by a +// totals returns the total number of endpoints and slices represented by a // ServicePortCache. -func (spc *ServicePortCache) numEndpoints() int { - num := 0 +func (spc *ServicePortCache) totals(maxEndpointsPerSlice int) (int, int, int) { + var actualSlices, desiredSlices, endpoints int for _, eInfo := range spc.items { - num += eInfo.Endpoints + endpoints += eInfo.Endpoints + actualSlices += eInfo.Slices + desiredSlices += numDesiredSlices(eInfo.Endpoints, maxEndpointsPerSlice) } - return num + return actualSlices, desiredSlices, endpoints } // UpdateServicePortCache updates a ServicePortCache in the global cache for a @@ -96,15 +103,18 @@ func (c *Cache) UpdateServicePortCache(serviceNN types.NamespacedName, spCache * c.lock.Lock() defer c.lock.Unlock() - prevNumEndpoints := 0 + var prevActualSlices, prevDesiredSlices, prevEndpoints int if existingSPCache, ok := c.cache[serviceNN]; ok { - prevNumEndpoints = existingSPCache.numEndpoints() + prevActualSlices, prevDesiredSlices, prevEndpoints = existingSPCache.totals(int(c.maxEndpointsPerSlice)) } - currNumEndpoints := spCache.numEndpoints() + currActualSlices, currDesiredSlices, currEndpoints := spCache.totals(int(c.maxEndpointsPerSlice)) // To keep numEndpoints up to date, add the difference between the number of // endpoints in the provided spCache and any spCache it might be replacing. - c.numEndpoints = c.numEndpoints + currNumEndpoints - prevNumEndpoints + c.numEndpoints = c.numEndpoints + currEndpoints - prevEndpoints + + c.numSlicesDesired += currDesiredSlices - prevDesiredSlices + c.numSlicesActual += currActualSlices - prevActualSlices c.cache[serviceNN] = spCache c.updateMetrics() @@ -117,45 +127,29 @@ func (c *Cache) DeleteService(serviceNN types.NamespacedName) { defer c.lock.Unlock() if spCache, ok := c.cache[serviceNN]; ok { - c.numEndpoints = c.numEndpoints - spCache.numEndpoints() - delete(c.cache, serviceNN) + actualSlices, desiredSlices, endpoints := spCache.totals(int(c.maxEndpointsPerSlice)) + c.numEndpoints = c.numEndpoints - endpoints + c.numSlicesDesired -= desiredSlices + c.numSlicesActual -= actualSlices c.updateMetrics() - } -} + delete(c.cache, serviceNN) -// metricsUpdate stores a desired and actual number of EndpointSlices. -type metricsUpdate struct { - desired, actual int -} - -// desiredAndActualSlices returns a metricsUpdate with the desired and actual -// number of EndpointSlices given the current values in the cache. -// Must be called holding lock. -func (c *Cache) desiredAndActualSlices() metricsUpdate { - mUpdate := metricsUpdate{} - for _, spCache := range c.cache { - for _, eInfo := range spCache.items { - mUpdate.actual += eInfo.Slices - mUpdate.desired += numDesiredSlices(eInfo.Endpoints, int(c.maxEndpointsPerSlice)) - } } - return mUpdate } // updateMetrics updates metrics with the values from this Cache. // Must be called holding lock. func (c *Cache) updateMetrics() { - mUpdate := c.desiredAndActualSlices() - NumEndpointSlices.WithLabelValues().Set(float64(mUpdate.actual)) - DesiredEndpointSlices.WithLabelValues().Set(float64(mUpdate.desired)) + NumEndpointSlices.WithLabelValues().Set(float64(c.numSlicesActual)) + DesiredEndpointSlices.WithLabelValues().Set(float64(c.numSlicesDesired)) EndpointsDesired.WithLabelValues().Set(float64(c.numEndpoints)) } // numDesiredSlices calculates the number of EndpointSlices that would exist // with ideal endpoint distribution. -func numDesiredSlices(numEndpoints, maxPerSlice int) int { - if numEndpoints <= maxPerSlice { +func numDesiredSlices(numEndpoints, maxEndpointsPerSlice int) int { + if numEndpoints <= maxEndpointsPerSlice { return 1 } - return int(math.Ceil(float64(numEndpoints) / float64(maxPerSlice))) + return int(math.Ceil(float64(numEndpoints) / float64(maxEndpointsPerSlice))) } diff --git a/pkg/controller/endpointslice/metrics/cache_test.go b/pkg/controller/endpointslice/metrics/cache_test.go index 508d4cebf66..df88ec18325 100644 --- a/pkg/controller/endpointslice/metrics/cache_test.go +++ b/pkg/controller/endpointslice/metrics/cache_test.go @@ -17,11 +17,14 @@ limitations under the License. package metrics import ( + "fmt" "testing" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/controller/util/endpoint" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" + utilpointer "k8s.io/utils/pointer" ) func TestNumEndpointsAndSlices(t *testing.T) { @@ -59,14 +62,57 @@ func TestNumEndpointsAndSlices(t *testing.T) { func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int, numEndpoints int) { t.Helper() - mUpdate := c.desiredAndActualSlices() - if mUpdate.desired != desired { - t.Errorf("Expected numEndpointSlices to be %d, got %d", desired, mUpdate.desired) + if c.numSlicesDesired != desired { + t.Errorf("Expected numSlicesDesired to be %d, got %d", desired, c.numSlicesDesired) } - if mUpdate.actual != actual { - t.Errorf("Expected desiredEndpointSlices to be %d, got %d", actual, mUpdate.actual) + if c.numSlicesActual != actual { + t.Errorf("Expected numSlicesActual to be %d, got %d", actual, c.numSlicesActual) } if c.numEndpoints != numEndpoints { t.Errorf("Expected numEndpoints to be %d, got %d", numEndpoints, c.numEndpoints) } } + +func benchmarkUpdateServicePortCache(b *testing.B, num int) { + c := NewCache(int32(100)) + ns := "benchmark" + httpKey := endpoint.NewPortMapKey([]discovery.EndpointPort{{Port: utilpointer.Int32Ptr(80)}}) + httpsKey := endpoint.NewPortMapKey([]discovery.EndpointPort{{Port: utilpointer.Int32Ptr(443)}}) + spCache := &ServicePortCache{items: map[endpointutil.PortMapKey]EfficiencyInfo{ + httpKey: { + Endpoints: 182, + Slices: 2, + }, + httpsKey: { + Endpoints: 356, + Slices: 4, + }, + }} + + for i := 0; i < num; i++ { + nName := types.NamespacedName{Namespace: ns, Name: fmt.Sprintf("service-%d", i)} + c.UpdateServicePortCache(nName, spCache) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + nName := types.NamespacedName{Namespace: ns, Name: fmt.Sprintf("bench-%d", i)} + c.UpdateServicePortCache(nName, spCache) + } +} + +func BenchmarkUpdateServicePortCache100(b *testing.B) { + benchmarkUpdateServicePortCache(b, 100) +} + +func BenchmarkUpdateServicePortCache1000(b *testing.B) { + benchmarkUpdateServicePortCache(b, 1000) +} + +func BenchmarkUpdateServicePortCache10000(b *testing.B) { + benchmarkUpdateServicePortCache(b, 10000) +} + +func BenchmarkUpdateServicePortCache100000(b *testing.B) { + benchmarkUpdateServicePortCache(b, 100000) +}