Improving performance of EndpointSlice controller metrics cache

This commit is contained in:
Rob Scott 2021-12-16 13:12:55 -08:00
parent eb43b41cfd
commit 242c33615e
No known key found for this signature in database
GPG Key ID: D91A796D0CFF0C5D
2 changed files with 81 additions and 41 deletions

View File

@ -39,11 +39,16 @@ type Cache struct {
// should be added to an EndpointSlice.
maxEndpointsPerSlice int32
// lock protects changes to numEndpoints and cache.
// lock protects changes to numEndpoints, numSlicesActual, numSlicesDesired,
// and cache.
lock sync.Mutex
// numEndpoints represents the total number of endpoints stored in
// EndpointSlices.
numEndpoints int
// numSlicesActual represents the total number of EndpointSlices.
numSlicesActual int
// numSlicesDesired represents the desired number of EndpointSlices.
numSlicesDesired int
// cache stores a ServicePortCache grouped by NamespacedNames representing
// Services.
cache map[types.NamespacedName]*ServicePortCache
@ -77,14 +82,16 @@ func (spc *ServicePortCache) Set(pmKey endpointutil.PortMapKey, eInfo Efficiency
spc.items[pmKey] = eInfo
}
// numEndpoints returns the total number of endpoints represented by a
// totals returns the total number of endpoints and slices represented by a
// ServicePortCache.
func (spc *ServicePortCache) numEndpoints() int {
num := 0
func (spc *ServicePortCache) totals(maxEndpointsPerSlice int) (int, int, int) {
var actualSlices, desiredSlices, endpoints int
for _, eInfo := range spc.items {
num += eInfo.Endpoints
endpoints += eInfo.Endpoints
actualSlices += eInfo.Slices
desiredSlices += numDesiredSlices(eInfo.Endpoints, maxEndpointsPerSlice)
}
return num
return actualSlices, desiredSlices, endpoints
}
// UpdateServicePortCache updates a ServicePortCache in the global cache for a
@ -96,15 +103,18 @@ func (c *Cache) UpdateServicePortCache(serviceNN types.NamespacedName, spCache *
c.lock.Lock()
defer c.lock.Unlock()
prevNumEndpoints := 0
var prevActualSlices, prevDesiredSlices, prevEndpoints int
if existingSPCache, ok := c.cache[serviceNN]; ok {
prevNumEndpoints = existingSPCache.numEndpoints()
prevActualSlices, prevDesiredSlices, prevEndpoints = existingSPCache.totals(int(c.maxEndpointsPerSlice))
}
currNumEndpoints := spCache.numEndpoints()
currActualSlices, currDesiredSlices, currEndpoints := spCache.totals(int(c.maxEndpointsPerSlice))
// To keep numEndpoints up to date, add the difference between the number of
// endpoints in the provided spCache and any spCache it might be replacing.
c.numEndpoints = c.numEndpoints + currNumEndpoints - prevNumEndpoints
c.numEndpoints = c.numEndpoints + currEndpoints - prevEndpoints
c.numSlicesDesired += currDesiredSlices - prevDesiredSlices
c.numSlicesActual += currActualSlices - prevActualSlices
c.cache[serviceNN] = spCache
c.updateMetrics()
@ -117,45 +127,29 @@ func (c *Cache) DeleteService(serviceNN types.NamespacedName) {
defer c.lock.Unlock()
if spCache, ok := c.cache[serviceNN]; ok {
c.numEndpoints = c.numEndpoints - spCache.numEndpoints()
delete(c.cache, serviceNN)
actualSlices, desiredSlices, endpoints := spCache.totals(int(c.maxEndpointsPerSlice))
c.numEndpoints = c.numEndpoints - endpoints
c.numSlicesDesired -= desiredSlices
c.numSlicesActual -= actualSlices
c.updateMetrics()
}
}
delete(c.cache, serviceNN)
// metricsUpdate stores a desired and actual number of EndpointSlices.
type metricsUpdate struct {
desired, actual int
}
// desiredAndActualSlices returns a metricsUpdate with the desired and actual
// number of EndpointSlices given the current values in the cache.
// Must be called holding lock.
func (c *Cache) desiredAndActualSlices() metricsUpdate {
mUpdate := metricsUpdate{}
for _, spCache := range c.cache {
for _, eInfo := range spCache.items {
mUpdate.actual += eInfo.Slices
mUpdate.desired += numDesiredSlices(eInfo.Endpoints, int(c.maxEndpointsPerSlice))
}
}
return mUpdate
}
// updateMetrics updates metrics with the values from this Cache.
// Must be called holding lock.
func (c *Cache) updateMetrics() {
mUpdate := c.desiredAndActualSlices()
NumEndpointSlices.WithLabelValues().Set(float64(mUpdate.actual))
DesiredEndpointSlices.WithLabelValues().Set(float64(mUpdate.desired))
NumEndpointSlices.WithLabelValues().Set(float64(c.numSlicesActual))
DesiredEndpointSlices.WithLabelValues().Set(float64(c.numSlicesDesired))
EndpointsDesired.WithLabelValues().Set(float64(c.numEndpoints))
}
// numDesiredSlices calculates the number of EndpointSlices that would exist
// with ideal endpoint distribution.
func numDesiredSlices(numEndpoints, maxPerSlice int) int {
if numEndpoints <= maxPerSlice {
func numDesiredSlices(numEndpoints, maxEndpointsPerSlice int) int {
if numEndpoints <= maxEndpointsPerSlice {
return 1
}
return int(math.Ceil(float64(numEndpoints) / float64(maxPerSlice)))
return int(math.Ceil(float64(numEndpoints) / float64(maxEndpointsPerSlice)))
}

View File

@ -17,11 +17,14 @@ limitations under the License.
package metrics
import (
"fmt"
"testing"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/controller/util/endpoint"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
utilpointer "k8s.io/utils/pointer"
)
func TestNumEndpointsAndSlices(t *testing.T) {
@ -59,14 +62,57 @@ func TestNumEndpointsAndSlices(t *testing.T) {
func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int, numEndpoints int) {
t.Helper()
mUpdate := c.desiredAndActualSlices()
if mUpdate.desired != desired {
t.Errorf("Expected numEndpointSlices to be %d, got %d", desired, mUpdate.desired)
if c.numSlicesDesired != desired {
t.Errorf("Expected numSlicesDesired to be %d, got %d", desired, c.numSlicesDesired)
}
if mUpdate.actual != actual {
t.Errorf("Expected desiredEndpointSlices to be %d, got %d", actual, mUpdate.actual)
if c.numSlicesActual != actual {
t.Errorf("Expected numSlicesActual to be %d, got %d", actual, c.numSlicesActual)
}
if c.numEndpoints != numEndpoints {
t.Errorf("Expected numEndpoints to be %d, got %d", numEndpoints, c.numEndpoints)
}
}
func benchmarkUpdateServicePortCache(b *testing.B, num int) {
c := NewCache(int32(100))
ns := "benchmark"
httpKey := endpoint.NewPortMapKey([]discovery.EndpointPort{{Port: utilpointer.Int32Ptr(80)}})
httpsKey := endpoint.NewPortMapKey([]discovery.EndpointPort{{Port: utilpointer.Int32Ptr(443)}})
spCache := &ServicePortCache{items: map[endpointutil.PortMapKey]EfficiencyInfo{
httpKey: {
Endpoints: 182,
Slices: 2,
},
httpsKey: {
Endpoints: 356,
Slices: 4,
},
}}
for i := 0; i < num; i++ {
nName := types.NamespacedName{Namespace: ns, Name: fmt.Sprintf("service-%d", i)}
c.UpdateServicePortCache(nName, spCache)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
nName := types.NamespacedName{Namespace: ns, Name: fmt.Sprintf("bench-%d", i)}
c.UpdateServicePortCache(nName, spCache)
}
}
func BenchmarkUpdateServicePortCache100(b *testing.B) {
benchmarkUpdateServicePortCache(b, 100)
}
func BenchmarkUpdateServicePortCache1000(b *testing.B) {
benchmarkUpdateServicePortCache(b, 1000)
}
func BenchmarkUpdateServicePortCache10000(b *testing.B) {
benchmarkUpdateServicePortCache(b, 10000)
}
func BenchmarkUpdateServicePortCache100000(b *testing.B) {
benchmarkUpdateServicePortCache(b, 100000)
}