mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 19:56:01 +00:00
Merge pull request #107091 from robscott/endpointslice-metrics-perf
Improving performance of EndpointSlice controller metrics cache
This commit is contained in:
commit
13e97453f9
@ -39,11 +39,16 @@ type Cache struct {
|
||||
// should be added to an EndpointSlice.
|
||||
maxEndpointsPerSlice int32
|
||||
|
||||
// lock protects changes to numEndpoints and cache.
|
||||
// lock protects changes to numEndpoints, numSlicesActual, numSlicesDesired,
|
||||
// and cache.
|
||||
lock sync.Mutex
|
||||
// numEndpoints represents the total number of endpoints stored in
|
||||
// EndpointSlices.
|
||||
numEndpoints int
|
||||
// numSlicesActual represents the total number of EndpointSlices.
|
||||
numSlicesActual int
|
||||
// numSlicesDesired represents the desired number of EndpointSlices.
|
||||
numSlicesDesired int
|
||||
// cache stores a ServicePortCache grouped by NamespacedNames representing
|
||||
// Services.
|
||||
cache map[types.NamespacedName]*ServicePortCache
|
||||
@ -77,14 +82,16 @@ func (spc *ServicePortCache) Set(pmKey endpointutil.PortMapKey, eInfo Efficiency
|
||||
spc.items[pmKey] = eInfo
|
||||
}
|
||||
|
||||
// numEndpoints returns the total number of endpoints represented by a
|
||||
// totals returns the total number of endpoints and slices represented by a
|
||||
// ServicePortCache.
|
||||
func (spc *ServicePortCache) numEndpoints() int {
|
||||
num := 0
|
||||
func (spc *ServicePortCache) totals(maxEndpointsPerSlice int) (int, int, int) {
|
||||
var actualSlices, desiredSlices, endpoints int
|
||||
for _, eInfo := range spc.items {
|
||||
num += eInfo.Endpoints
|
||||
endpoints += eInfo.Endpoints
|
||||
actualSlices += eInfo.Slices
|
||||
desiredSlices += numDesiredSlices(eInfo.Endpoints, maxEndpointsPerSlice)
|
||||
}
|
||||
return num
|
||||
return actualSlices, desiredSlices, endpoints
|
||||
}
|
||||
|
||||
// UpdateServicePortCache updates a ServicePortCache in the global cache for a
|
||||
@ -96,15 +103,18 @@ func (c *Cache) UpdateServicePortCache(serviceNN types.NamespacedName, spCache *
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
prevNumEndpoints := 0
|
||||
var prevActualSlices, prevDesiredSlices, prevEndpoints int
|
||||
if existingSPCache, ok := c.cache[serviceNN]; ok {
|
||||
prevNumEndpoints = existingSPCache.numEndpoints()
|
||||
prevActualSlices, prevDesiredSlices, prevEndpoints = existingSPCache.totals(int(c.maxEndpointsPerSlice))
|
||||
}
|
||||
|
||||
currNumEndpoints := spCache.numEndpoints()
|
||||
currActualSlices, currDesiredSlices, currEndpoints := spCache.totals(int(c.maxEndpointsPerSlice))
|
||||
// To keep numEndpoints up to date, add the difference between the number of
|
||||
// endpoints in the provided spCache and any spCache it might be replacing.
|
||||
c.numEndpoints = c.numEndpoints + currNumEndpoints - prevNumEndpoints
|
||||
c.numEndpoints = c.numEndpoints + currEndpoints - prevEndpoints
|
||||
|
||||
c.numSlicesDesired += currDesiredSlices - prevDesiredSlices
|
||||
c.numSlicesActual += currActualSlices - prevActualSlices
|
||||
|
||||
c.cache[serviceNN] = spCache
|
||||
c.updateMetrics()
|
||||
@ -117,45 +127,29 @@ func (c *Cache) DeleteService(serviceNN types.NamespacedName) {
|
||||
defer c.lock.Unlock()
|
||||
|
||||
if spCache, ok := c.cache[serviceNN]; ok {
|
||||
c.numEndpoints = c.numEndpoints - spCache.numEndpoints()
|
||||
delete(c.cache, serviceNN)
|
||||
actualSlices, desiredSlices, endpoints := spCache.totals(int(c.maxEndpointsPerSlice))
|
||||
c.numEndpoints = c.numEndpoints - endpoints
|
||||
c.numSlicesDesired -= desiredSlices
|
||||
c.numSlicesActual -= actualSlices
|
||||
c.updateMetrics()
|
||||
}
|
||||
}
|
||||
delete(c.cache, serviceNN)
|
||||
|
||||
// metricsUpdate stores a desired and actual number of EndpointSlices.
|
||||
type metricsUpdate struct {
|
||||
desired, actual int
|
||||
}
|
||||
|
||||
// desiredAndActualSlices returns a metricsUpdate with the desired and actual
|
||||
// number of EndpointSlices given the current values in the cache.
|
||||
// Must be called holding lock.
|
||||
func (c *Cache) desiredAndActualSlices() metricsUpdate {
|
||||
mUpdate := metricsUpdate{}
|
||||
for _, spCache := range c.cache {
|
||||
for _, eInfo := range spCache.items {
|
||||
mUpdate.actual += eInfo.Slices
|
||||
mUpdate.desired += numDesiredSlices(eInfo.Endpoints, int(c.maxEndpointsPerSlice))
|
||||
}
|
||||
}
|
||||
return mUpdate
|
||||
}
|
||||
|
||||
// updateMetrics updates metrics with the values from this Cache.
|
||||
// Must be called holding lock.
|
||||
func (c *Cache) updateMetrics() {
|
||||
mUpdate := c.desiredAndActualSlices()
|
||||
NumEndpointSlices.WithLabelValues().Set(float64(mUpdate.actual))
|
||||
DesiredEndpointSlices.WithLabelValues().Set(float64(mUpdate.desired))
|
||||
NumEndpointSlices.WithLabelValues().Set(float64(c.numSlicesActual))
|
||||
DesiredEndpointSlices.WithLabelValues().Set(float64(c.numSlicesDesired))
|
||||
EndpointsDesired.WithLabelValues().Set(float64(c.numEndpoints))
|
||||
}
|
||||
|
||||
// numDesiredSlices calculates the number of EndpointSlices that would exist
|
||||
// with ideal endpoint distribution.
|
||||
func numDesiredSlices(numEndpoints, maxPerSlice int) int {
|
||||
if numEndpoints <= maxPerSlice {
|
||||
func numDesiredSlices(numEndpoints, maxEndpointsPerSlice int) int {
|
||||
if numEndpoints <= maxEndpointsPerSlice {
|
||||
return 1
|
||||
}
|
||||
return int(math.Ceil(float64(numEndpoints) / float64(maxPerSlice)))
|
||||
return int(math.Ceil(float64(numEndpoints) / float64(maxEndpointsPerSlice)))
|
||||
}
|
||||
|
@ -17,11 +17,14 @@ limitations under the License.
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
discovery "k8s.io/api/discovery/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/controller/util/endpoint"
|
||||
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
|
||||
utilpointer "k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
func TestNumEndpointsAndSlices(t *testing.T) {
|
||||
@ -59,14 +62,57 @@ func TestNumEndpointsAndSlices(t *testing.T) {
|
||||
|
||||
func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int, numEndpoints int) {
|
||||
t.Helper()
|
||||
mUpdate := c.desiredAndActualSlices()
|
||||
if mUpdate.desired != desired {
|
||||
t.Errorf("Expected numEndpointSlices to be %d, got %d", desired, mUpdate.desired)
|
||||
if c.numSlicesDesired != desired {
|
||||
t.Errorf("Expected numSlicesDesired to be %d, got %d", desired, c.numSlicesDesired)
|
||||
}
|
||||
if mUpdate.actual != actual {
|
||||
t.Errorf("Expected desiredEndpointSlices to be %d, got %d", actual, mUpdate.actual)
|
||||
if c.numSlicesActual != actual {
|
||||
t.Errorf("Expected numSlicesActual to be %d, got %d", actual, c.numSlicesActual)
|
||||
}
|
||||
if c.numEndpoints != numEndpoints {
|
||||
t.Errorf("Expected numEndpoints to be %d, got %d", numEndpoints, c.numEndpoints)
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkUpdateServicePortCache(b *testing.B, num int) {
|
||||
c := NewCache(int32(100))
|
||||
ns := "benchmark"
|
||||
httpKey := endpoint.NewPortMapKey([]discovery.EndpointPort{{Port: utilpointer.Int32Ptr(80)}})
|
||||
httpsKey := endpoint.NewPortMapKey([]discovery.EndpointPort{{Port: utilpointer.Int32Ptr(443)}})
|
||||
spCache := &ServicePortCache{items: map[endpointutil.PortMapKey]EfficiencyInfo{
|
||||
httpKey: {
|
||||
Endpoints: 182,
|
||||
Slices: 2,
|
||||
},
|
||||
httpsKey: {
|
||||
Endpoints: 356,
|
||||
Slices: 4,
|
||||
},
|
||||
}}
|
||||
|
||||
for i := 0; i < num; i++ {
|
||||
nName := types.NamespacedName{Namespace: ns, Name: fmt.Sprintf("service-%d", i)}
|
||||
c.UpdateServicePortCache(nName, spCache)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
nName := types.NamespacedName{Namespace: ns, Name: fmt.Sprintf("bench-%d", i)}
|
||||
c.UpdateServicePortCache(nName, spCache)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkUpdateServicePortCache100(b *testing.B) {
|
||||
benchmarkUpdateServicePortCache(b, 100)
|
||||
}
|
||||
|
||||
func BenchmarkUpdateServicePortCache1000(b *testing.B) {
|
||||
benchmarkUpdateServicePortCache(b, 1000)
|
||||
}
|
||||
|
||||
func BenchmarkUpdateServicePortCache10000(b *testing.B) {
|
||||
benchmarkUpdateServicePortCache(b, 10000)
|
||||
}
|
||||
|
||||
func BenchmarkUpdateServicePortCache100000(b *testing.B) {
|
||||
benchmarkUpdateServicePortCache(b, 100000)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user