Merge pull request #83257 from robscott/endpointslice-metrics

Adding initial EndpointSlice metrics.
This commit is contained in:
Kubernetes Prow Robot 2019-11-05 15:30:31 -08:00 committed by GitHub
commit 0c32aa8910
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 657 additions and 56 deletions

View File

@ -15,8 +15,8 @@ go_library(
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",
"//pkg/apis/discovery/validation:go_default_library", "//pkg/apis/discovery/validation:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/controller/endpointslice/metrics:go_default_library",
"//pkg/controller/util/endpoint:go_default_library", "//pkg/controller/util/endpoint:go_default_library",
"//pkg/util/hash:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
@ -24,6 +24,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
@ -53,6 +54,7 @@ go_test(
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/controller/endpointslice/metrics:go_default_library",
"//pkg/controller/util/endpoint:go_default_library", "//pkg/controller/util/endpoint:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
@ -67,6 +69,8 @@ go_test(
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//vendor/github.com/prometheus/client_model/go:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library",
], ],
@ -84,6 +88,7 @@ filegroup(
srcs = [ srcs = [
":package-srcs", ":package-srcs",
"//pkg/controller/endpointslice/config:all-srcs", "//pkg/controller/endpointslice/config:all-srcs",
"//pkg/controller/endpointslice/metrics:all-srcs",
], ],
tags = ["automanaged"], tags = ["automanaged"],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],

View File

@ -20,6 +20,7 @@ import (
"sort" "sort"
discovery "k8s.io/api/discovery/v1alpha1" discovery "k8s.io/api/discovery/v1alpha1"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
) )
// endpointHash is used to uniquely identify endpoints. Only including addresses // endpointHash is used to uniquely identify endpoints. Only including addresses
@ -38,7 +39,7 @@ func hashEndpoint(endpoint *discovery.Endpoint) endpointHash {
hashObj.Hostname = *endpoint.Hostname hashObj.Hostname = *endpoint.Hostname
} }
return endpointHash(deepHashObjectToString(hashObj)) return endpointHash(endpointutil.DeepHashObjectToString(hashObj))
} }
// endpointSet provides simple methods for comparing sets of Endpoints. // endpointSet provides simple methods for comparing sets of Endpoints.

View File

@ -39,6 +39,7 @@ import (
"k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/component-base/metrics/prometheus/ratelimiter"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
endpointslicemetrics "k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
) )
@ -72,6 +73,8 @@ func NewController(podInformer coreinformers.PodInformer,
ratelimiter.RegisterMetricAndTrackRateLimiterUsage("endpoint_slice_controller", client.DiscoveryV1alpha1().RESTClient().GetRateLimiter()) ratelimiter.RegisterMetricAndTrackRateLimiterUsage("endpoint_slice_controller", client.DiscoveryV1alpha1().RESTClient().GetRateLimiter())
} }
endpointslicemetrics.RegisterMetrics()
c := &Controller{ c := &Controller{
client: client, client: client,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint_slice"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint_slice"),
@ -108,6 +111,7 @@ func NewController(podInformer coreinformers.PodInformer,
client: c.client, client: c.client,
nodeLister: c.nodeLister, nodeLister: c.nodeLister,
maxEndpointsPerSlice: c.maxEndpointsPerSlice, maxEndpointsPerSlice: c.maxEndpointsPerSlice,
metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice),
} }
c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker() c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
@ -251,6 +255,7 @@ func (c *Controller) syncService(key string) error {
if err != nil { if err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
c.triggerTimeTracker.DeleteService(namespace, name) c.triggerTimeTracker.DeleteService(namespace, name)
c.reconciler.deleteService(namespace, name)
// The service has been deleted, return nil so that it won't be retried. // The service has been deleted, return nil so that it won't be retried.
return nil return nil
} }

View File

@ -0,0 +1,42 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"cache.go",
"metrics.go",
],
importpath = "k8s.io/kubernetes/pkg/controller/endpointslice/metrics",
visibility = ["//visibility:public"],
deps = [
"//pkg/controller/util/endpoint:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["cache_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/controller/util/endpoint:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
],
)

View File

@ -0,0 +1,161 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"math"
"sync"
"k8s.io/apimachinery/pkg/types"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
)
// NewCache returns a new Cache with the specified endpointsPerSlice.
func NewCache(endpointsPerSlice int32) *Cache {
return &Cache{
maxEndpointsPerSlice: endpointsPerSlice,
cache: map[types.NamespacedName]*ServicePortCache{},
}
}
// Cache tracks values for total numbers of desired endpoints as well as the
// efficiency of EndpointSlice endpoints distribution.
type Cache struct {
// maxEndpointsPerSlice references the maximum number of endpoints that
// should be added to an EndpointSlice.
maxEndpointsPerSlice int32
// lock protects changes to numEndpoints and cache.
lock sync.Mutex
// numEndpoints represents the total number of endpoints stored in
// EndpointSlices.
numEndpoints int
// cache stores a ServicePortCache grouped by NamespacedNames representing
// Services.
cache map[types.NamespacedName]*ServicePortCache
}
// ServicePortCache tracks values for total numbers of desired endpoints as well
// as the efficiency of EndpointSlice endpoints distribution for each unique
// Service Port combination.
type ServicePortCache struct {
items map[endpointutil.PortMapKey]EfficiencyInfo
}
// EfficiencyInfo stores the number of Endpoints and Slices for calculating
// total numbers of desired endpoints and the efficiency of EndpointSlice
// endpoints distribution.
type EfficiencyInfo struct {
Endpoints int
Slices int
}
// NewServicePortCache initializes and returns a new ServicePortCache.
func NewServicePortCache() *ServicePortCache {
return &ServicePortCache{
items: map[endpointutil.PortMapKey]EfficiencyInfo{},
}
}
// Set updates the the ServicePortCache to contain the provided EfficiencyInfo
// for the provided PortMapKey.
func (spc *ServicePortCache) Set(pmKey endpointutil.PortMapKey, eInfo EfficiencyInfo) {
spc.items[pmKey] = eInfo
}
// numEndpoints returns the total number of endpoints represented by a
// ServicePortCache.
func (spc *ServicePortCache) numEndpoints() int {
num := 0
for _, eInfo := range spc.items {
num += eInfo.Endpoints
}
return num
}
// UpdateServicePortCache updates a ServicePortCache in the global cache for a
// given Service and updates the corresponding metrics.
// Parameters:
// * serviceNN refers to a NamespacedName representing the Service.
// * spCache refers to a ServicePortCache for the specified Service.
func (c *Cache) UpdateServicePortCache(serviceNN types.NamespacedName, spCache *ServicePortCache) {
c.lock.Lock()
defer c.lock.Unlock()
prevNumEndpoints := 0
if existingSPCache, ok := c.cache[serviceNN]; ok {
prevNumEndpoints = existingSPCache.numEndpoints()
}
currNumEndpoints := spCache.numEndpoints()
// To keep numEndpoints up to date, add the difference between the number of
// endpoints in the provided spCache and any spCache it might be replacing.
c.numEndpoints = c.numEndpoints + currNumEndpoints - prevNumEndpoints
c.cache[serviceNN] = spCache
c.updateMetrics()
}
// DeleteService removes references of a Service from the global cache and
// updates the corresponding metrics.
func (c *Cache) DeleteService(serviceNN types.NamespacedName) {
c.lock.Lock()
defer c.lock.Unlock()
if spCache, ok := c.cache[serviceNN]; ok {
c.numEndpoints = c.numEndpoints - spCache.numEndpoints()
delete(c.cache, serviceNN)
c.updateMetrics()
}
}
// metricsUpdate stores a desired and actual number of EndpointSlices.
type metricsUpdate struct {
desired, actual int
}
// desiredAndActualSlices returns a metricsUpdate with the desired and actual
// number of EndpointSlices given the current values in the cache.
// Must be called holding lock.
func (c *Cache) desiredAndActualSlices() metricsUpdate {
mUpdate := metricsUpdate{}
for _, spCache := range c.cache {
for _, eInfo := range spCache.items {
mUpdate.actual += eInfo.Slices
mUpdate.desired += numDesiredSlices(eInfo.Endpoints, int(c.maxEndpointsPerSlice))
}
}
return mUpdate
}
// updateMetrics updates metrics with the values from this Cache.
// Must be called holding lock.
func (c *Cache) updateMetrics() {
mUpdate := c.desiredAndActualSlices()
NumEndpointSlices.WithLabelValues().Set(float64(mUpdate.actual))
DesiredEndpointSlices.WithLabelValues().Set(float64(mUpdate.desired))
EndpointsDesired.WithLabelValues().Set(float64(c.numEndpoints))
}
// numDesiredSlices calculates the number of EndpointSlices that would exist
// with ideal endpoint distribution.
func numDesiredSlices(numEndpoints, maxPerSlice int) int {
if numEndpoints <= maxPerSlice {
return 1
}
return int(math.Ceil(float64(numEndpoints) / float64(maxPerSlice)))
}

View File

@ -0,0 +1,72 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"testing"
discovery "k8s.io/api/discovery/v1alpha1"
"k8s.io/apimachinery/pkg/types"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
)
func TestNumEndpointsAndSlices(t *testing.T) {
c := NewCache(int32(100))
p80 := int32(80)
p443 := int32(443)
pmKey80443 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}, {Port: &p443}})
pmKey80 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}})
spCacheEfficient := NewServicePortCache()
spCacheEfficient.Set(pmKey80, EfficiencyInfo{Endpoints: 45, Slices: 1})
spCacheEfficient.Set(pmKey80443, EfficiencyInfo{Endpoints: 35, Slices: 1})
spCacheInefficient := NewServicePortCache()
spCacheInefficient.Set(pmKey80, EfficiencyInfo{Endpoints: 12, Slices: 5})
spCacheInefficient.Set(pmKey80443, EfficiencyInfo{Endpoints: 18, Slices: 8})
c.UpdateServicePortCache(types.NamespacedName{Namespace: "ns1", Name: "svc1"}, spCacheInefficient)
expectNumEndpointsAndSlices(t, c, 2, 13, 30)
c.UpdateServicePortCache(types.NamespacedName{Namespace: "ns1", Name: "svc2"}, spCacheEfficient)
expectNumEndpointsAndSlices(t, c, 4, 15, 110)
c.UpdateServicePortCache(types.NamespacedName{Namespace: "ns1", Name: "svc3"}, spCacheInefficient)
expectNumEndpointsAndSlices(t, c, 6, 28, 140)
c.UpdateServicePortCache(types.NamespacedName{Namespace: "ns1", Name: "svc1"}, spCacheEfficient)
expectNumEndpointsAndSlices(t, c, 6, 17, 190)
c.DeleteService(types.NamespacedName{Namespace: "ns1", Name: "svc3"})
expectNumEndpointsAndSlices(t, c, 4, 4, 160)
}
func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int, numEndpoints int) {
t.Helper()
mUpdate := c.desiredAndActualSlices()
if mUpdate.desired != desired {
t.Errorf("Expected numEndpointSlices to be %d, got %d", desired, mUpdate.desired)
}
if mUpdate.actual != actual {
t.Errorf("Expected desiredEndpointSlices to be %d, got %d", actual, mUpdate.actual)
}
if c.numEndpoints != numEndpoints {
t.Errorf("Expected numEndpoints to be %d, got %d", numEndpoints, c.numEndpoints)
}
}

View File

@ -0,0 +1,110 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)
// EndpointSliceSubsystem - subsystem name used for Endpoint Slices.
const EndpointSliceSubsystem = "endpoint_slice_controller"
var (
// EndpointsAddedPerSync tracks the number of endpoints added on each
// Service sync.
EndpointsAddedPerSync = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: EndpointSliceSubsystem,
Name: "endpoints_added_per_sync",
Help: "Number of endpoints added on each Service sync",
StabilityLevel: metrics.ALPHA,
Buckets: metrics.ExponentialBuckets(2, 2, 15),
},
[]string{},
)
// EndpointsRemovedPerSync tracks the number of endpoints removed on each
// Service sync.
EndpointsRemovedPerSync = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: EndpointSliceSubsystem,
Name: "endpoints_removed_per_sync",
Help: "Number of endpoints removed on each Service sync",
StabilityLevel: metrics.ALPHA,
Buckets: metrics.ExponentialBuckets(2, 2, 15),
},
[]string{},
)
// EndpointsDesired tracks the total number of desired endpoints.
EndpointsDesired = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: EndpointSliceSubsystem,
Name: "endpoints_desired",
Help: "Number of endpoints desired",
StabilityLevel: metrics.ALPHA,
},
[]string{},
)
// NumEndpointSlices tracks the number of EndpointSlices in a cluster.
NumEndpointSlices = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: EndpointSliceSubsystem,
Name: "num_endpoint_slices",
Help: "Number of EndpointSlices",
StabilityLevel: metrics.ALPHA,
},
[]string{},
)
// DesiredEndpointSlices tracks the number of EndpointSlices that would
// exist with perfect endpoint allocation.
DesiredEndpointSlices = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Subsystem: EndpointSliceSubsystem,
Name: "desired_endpoint_slices",
Help: "Number of EndpointSlices that would exist with perfect endpoint allocation",
StabilityLevel: metrics.ALPHA,
},
[]string{},
)
// EndpointSliceChanges tracks the number of changes to Endpoint Slices.
EndpointSliceChanges = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: EndpointSliceSubsystem,
Name: "changes",
Help: "Number of EndpointSlice changes",
StabilityLevel: metrics.ALPHA,
},
[]string{"operation"},
)
)
var registerMetrics sync.Once
// RegisterMetrics registers EndpointSlice metrics.
func RegisterMetrics() {
registerMetrics.Do(func() {
legacyregistry.MustRegister(EndpointsAddedPerSync)
legacyregistry.MustRegister(EndpointsRemovedPerSync)
legacyregistry.MustRegister(EndpointsDesired)
legacyregistry.MustRegister(NumEndpointSlices)
legacyregistry.MustRegister(DesiredEndpointSlices)
legacyregistry.MustRegister(EndpointSliceChanges)
})
}

View File

@ -25,10 +25,12 @@ import (
discovery "k8s.io/api/discovery/v1alpha1" discovery "k8s.io/api/discovery/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
) )
@ -38,6 +40,7 @@ type reconciler struct {
client clientset.Interface client clientset.Interface
nodeLister corelisters.NodeLister nodeLister corelisters.NodeLister
maxEndpointsPerSlice int32 maxEndpointsPerSlice int32
metricsCache *metrics.Cache
} }
// endpointMeta includes the attributes we group slices on, this type helps with // endpointMeta includes the attributes we group slices on, this type helps with
@ -53,20 +56,23 @@ type endpointMeta struct {
// to ensure the desired set of pods are represented by endpoint slices. // to ensure the desired set of pods are represented by endpoint slices.
func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error { func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error {
// Build data structures for existing state. // Build data structures for existing state.
existingSlicesByPortMap := map[portMapKey][]*discovery.EndpointSlice{} existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{}
numExistingEndpoints := 0
for _, existingSlice := range existingSlices { for _, existingSlice := range existingSlices {
epHash := newPortMapKey(existingSlice.Ports) epHash := endpointutil.NewPortMapKey(existingSlice.Ports)
existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice) existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice)
numExistingEndpoints += len(existingSlice.Endpoints)
} }
// Build data structures for desired state. // Build data structures for desired state.
desiredMetaByPortMap := map[portMapKey]*endpointMeta{} desiredMetaByPortMap := map[endpointutil.PortMapKey]*endpointMeta{}
desiredEndpointsByPortMap := map[portMapKey]endpointSet{} desiredEndpointsByPortMap := map[endpointutil.PortMapKey]endpointSet{}
numDesiredEndpoints := 0
for _, pod := range pods { for _, pod := range pods {
if endpointutil.ShouldPodBeInEndpoints(pod) { if endpointutil.ShouldPodBeInEndpoints(pod) {
endpointPorts := getEndpointPorts(service, pod) endpointPorts := getEndpointPorts(service, pod)
epHash := newPortMapKey(endpointPorts) epHash := endpointutil.NewPortMapKey(endpointPorts)
if _, ok := desiredEndpointsByPortMap[epHash]; !ok { if _, ok := desiredEndpointsByPortMap[epHash]; !ok {
desiredEndpointsByPortMap[epHash] = endpointSet{} desiredEndpointsByPortMap[epHash] = endpointSet{}
} }
@ -86,17 +92,31 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
} }
endpoint := podToEndpoint(pod, node) endpoint := podToEndpoint(pod, node)
desiredEndpointsByPortMap[epHash].Insert(&endpoint) desiredEndpointsByPortMap[epHash].Insert(&endpoint)
numDesiredEndpoints++
} }
} }
slicesToCreate := []*discovery.EndpointSlice{} slicesToCreate := []*discovery.EndpointSlice{}
slicesToUpdate := []*discovery.EndpointSlice{} slicesToUpdate := []*discovery.EndpointSlice{}
sliceNamesToDelete := sets.String{} sliceNamesToDelete := sets.String{}
spMetrics := metrics.NewServicePortCache()
totalAdded := 0
totalRemoved := 0
// Determine changes necessary for each group of slices by port map. // Determine changes necessary for each group of slices by port map.
for portMap, desiredEndpoints := range desiredEndpointsByPortMap { for portMap, desiredEndpoints := range desiredEndpointsByPortMap {
pmSlicesToCreate, pmSlicesToUpdate, pmSliceNamesToDelete := r.reconcileByPortMapping( numEndpoints := len(desiredEndpoints)
pmSlicesToCreate, pmSlicesToUpdate, pmSliceNamesToDelete, added, removed := r.reconcileByPortMapping(
service, existingSlicesByPortMap[portMap], desiredEndpoints, desiredMetaByPortMap[portMap]) service, existingSlicesByPortMap[portMap], desiredEndpoints, desiredMetaByPortMap[portMap])
totalAdded += added
totalRemoved += removed
spMetrics.Set(portMap, metrics.EfficiencyInfo{
Endpoints: numEndpoints,
Slices: len(existingSlicesByPortMap[portMap]) + len(pmSlicesToCreate) - len(pmSliceNamesToDelete),
})
if len(pmSlicesToCreate) > 0 { if len(pmSlicesToCreate) > 0 {
slicesToCreate = append(slicesToCreate, pmSlicesToCreate...) slicesToCreate = append(slicesToCreate, pmSlicesToCreate...)
} }
@ -122,8 +142,18 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
if len(existingSlices) == sliceNamesToDelete.Len() && len(slicesToCreate) < 1 { if len(existingSlices) == sliceNamesToDelete.Len() && len(slicesToCreate) < 1 {
placeholderSlice := newEndpointSlice(service, &endpointMeta{Ports: []discovery.EndpointPort{}}) placeholderSlice := newEndpointSlice(service, &endpointMeta{Ports: []discovery.EndpointPort{}})
slicesToCreate = append(slicesToCreate, placeholderSlice) slicesToCreate = append(slicesToCreate, placeholderSlice)
spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{
Endpoints: 0,
Slices: 1,
})
} }
metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded))
metrics.EndpointsRemovedPerSync.WithLabelValues().Observe(float64(totalRemoved))
serviceNN := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
r.metricsCache.UpdateServicePortCache(serviceNN, spMetrics)
return r.finalize(service, slicesToCreate, slicesToUpdate, sliceNamesToDelete, triggerTime) return r.finalize(service, slicesToCreate, slicesToUpdate, sliceNamesToDelete, triggerTime)
} }
@ -156,6 +186,8 @@ func (r *reconciler) finalize(
return nil return nil
} }
errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Service %s/%s: %v", service.Namespace, service.Name, err)) errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Service %s/%s: %v", service.Namespace, service.Name, err))
} else {
metrics.EndpointSliceChanges.WithLabelValues("create").Inc()
} }
} }
@ -164,6 +196,8 @@ func (r *reconciler) finalize(
_, err := r.client.DiscoveryV1alpha1().EndpointSlices(service.Namespace).Update(endpointSlice) _, err := r.client.DiscoveryV1alpha1().EndpointSlices(service.Namespace).Update(endpointSlice)
if err != nil { if err != nil {
errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err)) errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err))
} else {
metrics.EndpointSliceChanges.WithLabelValues("update").Inc()
} }
} }
@ -172,6 +206,8 @@ func (r *reconciler) finalize(
err := r.client.DiscoveryV1alpha1().EndpointSlices(service.Namespace).Delete(sliceName, &metav1.DeleteOptions{}) err := r.client.DiscoveryV1alpha1().EndpointSlices(service.Namespace).Delete(sliceName, &metav1.DeleteOptions{})
if err != nil { if err != nil {
errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", sliceName, service.Namespace, service.Name, err)) errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", sliceName, service.Namespace, service.Name, err))
} else {
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
} }
} }
@ -192,11 +228,12 @@ func (r *reconciler) reconcileByPortMapping(
existingSlices []*discovery.EndpointSlice, existingSlices []*discovery.EndpointSlice,
desiredSet endpointSet, desiredSet endpointSet,
endpointMeta *endpointMeta, endpointMeta *endpointMeta,
) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice, sets.String) { ) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice, sets.String, int, int) {
slicesByName := map[string]*discovery.EndpointSlice{} slicesByName := map[string]*discovery.EndpointSlice{}
sliceNamesUnchanged := sets.String{} sliceNamesUnchanged := sets.String{}
sliceNamesToUpdate := sets.String{} sliceNamesToUpdate := sets.String{}
sliceNamesToDelete := sets.String{} sliceNamesToDelete := sets.String{}
numRemoved := 0
// 1. Iterate through existing slices to delete endpoints no longer desired // 1. Iterate through existing slices to delete endpoints no longer desired
// and update endpoints that have changed // and update endpoints that have changed
@ -222,6 +259,9 @@ func (r *reconciler) reconcileByPortMapping(
// If an endpoint was updated or removed, mark for update or delete // If an endpoint was updated or removed, mark for update or delete
if endpointUpdated || len(existingSlice.Endpoints) != len(newEndpoints) { if endpointUpdated || len(existingSlice.Endpoints) != len(newEndpoints) {
if len(existingSlice.Endpoints) > len(newEndpoints) {
numRemoved += len(existingSlice.Endpoints) - len(newEndpoints)
}
if len(newEndpoints) == 0 { if len(newEndpoints) == 0 {
// if no endpoints desired in this slice, mark for deletion // if no endpoints desired in this slice, mark for deletion
sliceNamesToDelete.Insert(existingSlice.Name) sliceNamesToDelete.Insert(existingSlice.Name)
@ -236,6 +276,8 @@ func (r *reconciler) reconcileByPortMapping(
} }
} }
numAdded := desiredSet.Len()
// 2. If we still have desired endpoints to add and slices marked for update, // 2. If we still have desired endpoints to add and slices marked for update,
// iterate through the slices and fill them up with the desired endpoints. // iterate through the slices and fill them up with the desired endpoints.
if desiredSet.Len() > 0 && sliceNamesToUpdate.Len() > 0 { if desiredSet.Len() > 0 && sliceNamesToUpdate.Len() > 0 {
@ -302,5 +344,9 @@ func (r *reconciler) reconcileByPortMapping(
slicesToUpdate = append(slicesToUpdate, slicesByName[sliceName]) slicesToUpdate = append(slicesToUpdate, slicesByName[sliceName])
} }
return slicesToCreate, slicesToUpdate, sliceNamesToDelete return slicesToCreate, slicesToUpdate, sliceNamesToDelete, numAdded, numRemoved
}
func (r *reconciler) deleteService(namespace, name string) {
r.metricsCache.DeleteService(types.NamespacedName{Namespace: namespace, Name: name})
} }

View File

@ -17,9 +17,11 @@ limitations under the License.
package endpointslice package endpointslice
import ( import (
"fmt"
"testing" "testing"
"time" "time"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
@ -31,7 +33,9 @@ import (
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
corelisters "k8s.io/client-go/listers/core/v1" corelisters "k8s.io/client-go/listers/core/v1"
k8stesting "k8s.io/client-go/testing" k8stesting "k8s.io/client-go/testing"
compmetrics "k8s.io/component-base/metrics"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
utilpointer "k8s.io/utils/pointer" utilpointer "k8s.io/utils/pointer"
) )
@ -40,8 +44,9 @@ var defaultMaxEndpointsPerSlice = int32(100)
// Even when there are no pods, we want to have a placeholder slice for each service // Even when there are no pods, we want to have a placeholder slice for each service
func TestReconcileEmpty(t *testing.T) { func TestReconcileEmpty(t *testing.T) {
client := newClientset() client := newClientset()
setupMetrics()
namespace := "test" namespace := "test"
svc, _ := newServiceAndendpointMeta("foo", namespace) svc, _ := newServiceAndEndpointMeta("foo", namespace)
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice) r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
reconcileHelper(t, r, &svc, []*corev1.Pod{}, []*discovery.EndpointSlice{}, time.Now()) reconcileHelper(t, r, &svc, []*corev1.Pod{}, []*discovery.EndpointSlice{}, time.Now())
@ -54,14 +59,16 @@ func TestReconcileEmpty(t *testing.T) {
assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName]) assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName])
assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports) assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports)
assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints) assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints)
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
} }
// Given a single pod matching a service selector and no existing endpoint slices, // Given a single pod matching a service selector and no existing endpoint slices,
// a slice should be created // a slice should be created
func TestReconcile1Pod(t *testing.T) { func TestReconcile1Pod(t *testing.T) {
client := newClientset() client := newClientset()
setupMetrics()
namespace := "test" namespace := "test"
svc, _ := newServiceAndendpointMeta("foo", namespace) svc, _ := newServiceAndEndpointMeta("foo", namespace)
pod1 := newPod(1, namespace, true, 1) pod1 := newPod(1, namespace, true, 1)
pod1.Spec.Hostname = "example-hostname" pod1.Spec.Hostname = "example-hostname"
node1 := &corev1.Node{ node1 := &corev1.Node{
@ -100,14 +107,16 @@ func TestReconcile1Pod(t *testing.T) {
Name: "pod1", Name: "pod1",
}, },
}}, slices[0].Endpoints) }}, slices[0].Endpoints)
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
} }
// given an existing endpoint slice and no pods matching the service, the existing // given an existing endpoint slice and no pods matching the service, the existing
// slice should be updated to a placeholder (not deleted) // slice should be updated to a placeholder (not deleted)
func TestReconcile1EndpointSlice(t *testing.T) { func TestReconcile1EndpointSlice(t *testing.T) {
client := newClientset() client := newClientset()
setupMetrics()
namespace := "test" namespace := "test"
svc, endpointMeta := newServiceAndendpointMeta("foo", namespace) svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc) endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
_, createErr := client.DiscoveryV1alpha1().EndpointSlices(namespace).Create(endpointSlice1) _, createErr := client.DiscoveryV1alpha1().EndpointSlices(namespace).Create(endpointSlice1)
@ -127,14 +136,16 @@ func TestReconcile1EndpointSlice(t *testing.T) {
assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName]) assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName])
assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports) assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports)
assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints) assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints)
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 1, numDeleted: 0})
} }
// a simple use case with 250 pods matching a service and no existing slices // a simple use case with 250 pods matching a service and no existing slices
// reconcile should create 3 slices, completely filling 2 of them // reconcile should create 3 slices, completely filling 2 of them
func TestReconcileManyPods(t *testing.T) { func TestReconcileManyPods(t *testing.T) {
client := newClientset() client := newClientset()
setupMetrics()
namespace := "test" namespace := "test"
svc, _ := newServiceAndendpointMeta("foo", namespace) svc, _ := newServiceAndEndpointMeta("foo", namespace)
// start with 250 pods // start with 250 pods
pods := []*corev1.Pod{} pods := []*corev1.Pod{}
@ -152,6 +163,7 @@ func TestReconcileManyPods(t *testing.T) {
// Two endpoint slices should be completely full, the remainder should be in another one // Two endpoint slices should be completely full, the remainder should be in another one
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 50}) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 50})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 250, removedPerSync: 0, numCreated: 3, numUpdated: 0, numDeleted: 0})
} }
// now with preexisting slices, we have 250 pods matching a service // now with preexisting slices, we have 250 pods matching a service
@ -164,8 +176,9 @@ func TestReconcileManyPods(t *testing.T) {
// this approach requires 1 update + 1 create instead of 2 updates + 1 create // this approach requires 1 update + 1 create instead of 2 updates + 1 create
func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) { func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
client := newClientset() client := newClientset()
setupMetrics()
namespace := "test" namespace := "test"
svc, endpointMeta := newServiceAndendpointMeta("foo", namespace) svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
// start with 250 pods // start with 250 pods
pods := []*corev1.Pod{} pods := []*corev1.Pod{}
@ -200,6 +213,7 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
// 1 new slice (0->100) + 1 updated slice (62->89) // 1 new slice (0->100) + 1 updated slice (62->89)
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{89, 61, 100}) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{89, 61, 100})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 127, removedPerSync: 0, numCreated: 1, numUpdated: 1, numDeleted: 0})
} }
// now with preexisting slices, we have 300 pods matching a service // now with preexisting slices, we have 300 pods matching a service
@ -214,8 +228,9 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
// this approach requires 2 creates instead of 2 updates + 1 create // this approach requires 2 creates instead of 2 updates + 1 create
func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) { func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
client := newClientset() client := newClientset()
setupMetrics()
namespace := "test" namespace := "test"
svc, endpointMeta := newServiceAndendpointMeta("foo", namespace) svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
// start with 300 pods // start with 300 pods
pods := []*corev1.Pod{} pods := []*corev1.Pod{}
@ -249,6 +264,7 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
// 2 new slices (100, 52) in addition to existing slices (74, 74) // 2 new slices (100, 52) in addition to existing slices (74, 74)
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{74, 74, 100, 52}) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{74, 74, 100, 52})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 4, desiredEndpoints: 300, addedPerSync: 152, removedPerSync: 0, numCreated: 2, numUpdated: 0, numDeleted: 0})
} }
// In some cases, such as a service port change, all slices for that service will require a change // In some cases, such as a service port change, all slices for that service will require a change
@ -256,7 +272,7 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
func TestReconcileEndpointSlicesUpdating(t *testing.T) { func TestReconcileEndpointSlicesUpdating(t *testing.T) {
client := newClientset() client := newClientset()
namespace := "test" namespace := "test"
svc, _ := newServiceAndendpointMeta("foo", namespace) svc, _ := newServiceAndEndpointMeta("foo", namespace)
// start with 250 pods // start with 250 pods
pods := []*corev1.Pod{} pods := []*corev1.Pod{}
@ -290,8 +306,9 @@ func TestReconcileEndpointSlicesUpdating(t *testing.T) {
// reconcile repacks the endpoints into 3 slices, and deletes the extras // reconcile repacks the endpoints into 3 slices, and deletes the extras
func TestReconcileEndpointSlicesRecycling(t *testing.T) { func TestReconcileEndpointSlicesRecycling(t *testing.T) {
client := newClientset() client := newClientset()
setupMetrics()
namespace := "test" namespace := "test"
svc, endpointMeta := newServiceAndendpointMeta("foo", namespace) svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
// start with 300 pods // start with 300 pods
pods := []*corev1.Pod{} pods := []*corev1.Pod{}
@ -327,6 +344,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
// thanks to recycling, we get a free repack of endpoints, resulting in 3 full slices instead of 10 mostly empty slices // thanks to recycling, we get a free repack of endpoints, resulting in 3 full slices instead of 10 mostly empty slices
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 100}) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 100})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 0, numUpdated: 3, numDeleted: 7})
} }
// In this test, we want to verify that endpoints are added to a slice that will // In this test, we want to verify that endpoints are added to a slice that will
@ -334,8 +352,9 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
// for update. // for update.
func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) { func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
client := newClientset() client := newClientset()
setupMetrics()
namespace := "test" namespace := "test"
svc, endpointMeta := newServiceAndendpointMeta("foo", namespace) svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
existingSlices := []*discovery.EndpointSlice{} existingSlices := []*discovery.EndpointSlice{}
pods := []*corev1.Pod{} pods := []*corev1.Pod{}
@ -378,6 +397,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
// ensure that both endpoint slices have been updated // ensure that both endpoint slices have been updated
expectActions(t, client.Actions(), 2, "update", "endpointslices") expectActions(t, client.Actions(), 2, "update", "endpointslices")
expectMetrics(t, expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 115, addedPerSync: 15, removedPerSync: 0, numCreated: 0, numUpdated: 2, numDeleted: 0})
// additional pods should get added to fuller slice // additional pods should get added to fuller slice
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{95, 20}) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{95, 20})
@ -387,6 +407,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
// This test ensures that EndpointSlices are grouped correctly in that case. // This test ensures that EndpointSlices are grouped correctly in that case.
func TestReconcileEndpointSlicesNamedPorts(t *testing.T) { func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
client := newClientset() client := newClientset()
setupMetrics()
namespace := "test" namespace := "test"
portNameIntStr := intstr.IntOrString{ portNameIntStr := intstr.IntOrString{
@ -425,6 +446,7 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
// reconcile should create 5 endpoint slices // reconcile should create 5 endpoint slices
assert.Equal(t, 5, len(client.Actions()), "Expected 5 client actions as part of reconcile") assert.Equal(t, 5, len(client.Actions()), "Expected 5 client actions as part of reconcile")
expectActions(t, client.Actions(), 5, "create", "endpointslices") expectActions(t, client.Actions(), 5, "create", "endpointslices")
expectMetrics(t, expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 5, numUpdated: 0, numDeleted: 0})
fetchedSlices := fetchEndpointSlices(t, client, namespace) fetchedSlices := fetchEndpointSlices(t, client, namespace)
@ -454,7 +476,7 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
// appropriate endpoints distribution among slices // appropriate endpoints distribution among slices
func TestReconcileMaxEndpointsPerSlice(t *testing.T) { func TestReconcileMaxEndpointsPerSlice(t *testing.T) {
namespace := "test" namespace := "test"
svc, _ := newServiceAndendpointMeta("foo", namespace) svc, _ := newServiceAndEndpointMeta("foo", namespace)
// start with 250 pods // start with 250 pods
pods := []*corev1.Pod{} pods := []*corev1.Pod{}
@ -466,33 +488,69 @@ func TestReconcileMaxEndpointsPerSlice(t *testing.T) {
testCases := []struct { testCases := []struct {
maxEndpointsPerSlice int32 maxEndpointsPerSlice int32
expectedSliceLengths []int expectedSliceLengths []int
expectedMetricValues expectedMetrics
}{ }{
{ {
maxEndpointsPerSlice: int32(50), maxEndpointsPerSlice: int32(50),
expectedSliceLengths: []int{50, 50, 50, 50, 50}, expectedSliceLengths: []int{50, 50, 50, 50, 50},
expectedMetricValues: expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 250, addedPerSync: 250, numCreated: 5},
}, { }, {
maxEndpointsPerSlice: int32(80), maxEndpointsPerSlice: int32(80),
expectedSliceLengths: []int{80, 80, 80, 10}, expectedSliceLengths: []int{80, 80, 80, 10},
expectedMetricValues: expectedMetrics{desiredSlices: 4, actualSlices: 4, desiredEndpoints: 250, addedPerSync: 250, numCreated: 4},
}, { }, {
maxEndpointsPerSlice: int32(150), maxEndpointsPerSlice: int32(150),
expectedSliceLengths: []int{150, 100}, expectedSliceLengths: []int{150, 100},
expectedMetricValues: expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 250, addedPerSync: 250, numCreated: 2},
}, { }, {
maxEndpointsPerSlice: int32(250), maxEndpointsPerSlice: int32(250),
expectedSliceLengths: []int{250}, expectedSliceLengths: []int{250},
expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1},
}, { }, {
maxEndpointsPerSlice: int32(500), maxEndpointsPerSlice: int32(500),
expectedSliceLengths: []int{250}, expectedSliceLengths: []int{250},
expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1},
}, },
} }
for _, testCase := range testCases { for _, testCase := range testCases {
client := newClientset() t.Run(fmt.Sprintf("maxEndpointsPerSlice: %d", testCase.maxEndpointsPerSlice), func(t *testing.T) {
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, testCase.maxEndpointsPerSlice) client := newClientset()
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now()) setupMetrics()
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), testCase.expectedSliceLengths) r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, testCase.maxEndpointsPerSlice)
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), testCase.expectedSliceLengths)
expectMetrics(t, testCase.expectedMetricValues)
})
} }
} }
func TestReconcileEndpointSlicesMetrics(t *testing.T) {
client := newClientset()
setupMetrics()
namespace := "test"
svc, _ := newServiceAndEndpointMeta("foo", namespace)
// start with 20 pods
pods := []*corev1.Pod{}
for i := 0; i < 20; i++ {
pods = append(pods, newPod(i, namespace, true, 1))
}
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now())
actions := client.Actions()
assert.Equal(t, 1, len(actions), "Expected 1 additional client actions as part of reconcile")
assert.True(t, actions[0].Matches("create", "endpointslices"), "First action should be create endpoint slice")
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 20, addedPerSync: 20, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
fetchedSlices := fetchEndpointSlices(t, client, namespace)
reconcileHelper(t, r, &svc, pods[0:10], []*discovery.EndpointSlice{&fetchedSlices[0]}, time.Now())
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 10, addedPerSync: 20, removedPerSync: 10, numCreated: 1, numUpdated: 1, numDeleted: 0})
}
// Test Helpers // Test Helpers
func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *reconciler { func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *reconciler {
@ -507,6 +565,7 @@ func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPer
client: client, client: client,
nodeLister: corelisters.NewNodeLister(indexer), nodeLister: corelisters.NewNodeLister(indexer),
maxEndpointsPerSlice: maxEndpointsPerSlice, maxEndpointsPerSlice: maxEndpointsPerSlice,
metricsCache: metrics.NewCache(maxEndpointsPerSlice),
} }
} }
@ -604,3 +663,99 @@ func reconcileHelper(t *testing.T, r *reconciler, service *corev1.Service, pods
t.Fatalf("Expected no error reconciling Endpoint Slices, got: %v", err) t.Fatalf("Expected no error reconciling Endpoint Slices, got: %v", err)
} }
} }
// Metrics helpers
type expectedMetrics struct {
desiredSlices int
actualSlices int
desiredEndpoints int
addedPerSync int
removedPerSync int
numCreated int
numUpdated int
numDeleted int
}
func expectMetrics(t *testing.T, em expectedMetrics) {
t.Helper()
actualDesiredSlices := getGaugeMetricValue(t, metrics.DesiredEndpointSlices.WithLabelValues())
if actualDesiredSlices != float64(em.desiredSlices) {
t.Errorf("Expected desiredEndpointSlices to be %d, got %v", em.desiredSlices, actualDesiredSlices)
}
actualNumSlices := getGaugeMetricValue(t, metrics.NumEndpointSlices.WithLabelValues())
if actualDesiredSlices != float64(em.desiredSlices) {
t.Errorf("Expected numEndpointSlices to be %d, got %v", em.actualSlices, actualNumSlices)
}
actualEndpointsDesired := getGaugeMetricValue(t, metrics.EndpointsDesired.WithLabelValues())
if actualEndpointsDesired != float64(em.desiredEndpoints) {
t.Errorf("Expected desiredEndpoints to be %d, got %v", em.desiredEndpoints, actualEndpointsDesired)
}
actualAddedPerSync := getHistogramMetricValue(t, metrics.EndpointsAddedPerSync.WithLabelValues())
if actualAddedPerSync != float64(em.addedPerSync) {
t.Errorf("Expected endpointsAddedPerSync to be %d, got %v", em.addedPerSync, actualAddedPerSync)
}
actualRemovedPerSync := getHistogramMetricValue(t, metrics.EndpointsRemovedPerSync.WithLabelValues())
if actualRemovedPerSync != float64(em.removedPerSync) {
t.Errorf("Expected endpointsRemovedPerSync to be %d, got %v", em.removedPerSync, actualRemovedPerSync)
}
actualCreated := getCounterMetricValue(t, metrics.EndpointSliceChanges.WithLabelValues("create"))
if actualCreated != float64(em.numCreated) {
t.Errorf("Expected endpointSliceChangesCreated to be %d, got %v", em.numCreated, actualCreated)
}
actualUpdated := getCounterMetricValue(t, metrics.EndpointSliceChanges.WithLabelValues("update"))
if actualUpdated != float64(em.numUpdated) {
t.Errorf("Expected endpointSliceChangesUpdated to be %d, got %v", em.numUpdated, actualUpdated)
}
actualDeleted := getCounterMetricValue(t, metrics.EndpointSliceChanges.WithLabelValues("delete"))
if actualDeleted != float64(em.numDeleted) {
t.Errorf("Expected endpointSliceChangesDeleted to be %d, got %v", em.numDeleted, actualDeleted)
}
}
func setupMetrics() {
metrics.RegisterMetrics()
metrics.NumEndpointSlices.Delete(map[string]string{})
metrics.DesiredEndpointSlices.Delete(map[string]string{})
metrics.EndpointsDesired.Delete(map[string]string{})
metrics.EndpointsAddedPerSync.Delete(map[string]string{})
metrics.EndpointsRemovedPerSync.Delete(map[string]string{})
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "create"})
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "update"})
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "delete"})
}
func getGaugeMetricValue(t *testing.T, metric compmetrics.GaugeMetric) float64 {
t.Helper()
metricProto := &dto.Metric{}
if err := metric.Write(metricProto); err != nil {
t.Errorf("Error writing metric: %v", err)
}
return metricProto.Gauge.GetValue()
}
func getCounterMetricValue(t *testing.T, metric compmetrics.CounterMetric) float64 {
t.Helper()
metricProto := &dto.Metric{}
if err := metric.(compmetrics.Metric).Write(metricProto); err != nil {
t.Errorf("Error writing metric: %v", err)
}
return metricProto.Counter.GetValue()
}
func getHistogramMetricValue(t *testing.T, metric compmetrics.ObserverMetric) float64 {
t.Helper()
metricProto := &dto.Metric{}
if err := metric.(compmetrics.Metric).Write(metricProto); err != nil {
t.Errorf("Error writing metric: %v", err)
}
return metricProto.Histogram.GetSampleSum()
}

View File

@ -17,11 +17,8 @@ limitations under the License.
package endpointslice package endpointslice
import ( import (
"crypto/md5"
"encoding/hex"
"fmt" "fmt"
"reflect" "reflect"
"sort"
"time" "time"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
@ -33,7 +30,6 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/discovery/validation" "k8s.io/kubernetes/pkg/apis/discovery/validation"
"k8s.io/kubernetes/pkg/util/hash"
) )
// podEndpointChanged returns true if the results of podToEndpoint are different // podEndpointChanged returns true if the results of podToEndpoint are different
@ -235,21 +231,6 @@ func addTriggerTimeAnnotation(endpointSlice *discovery.EndpointSlice, triggerTim
} }
} }
// deepHashObject creates a unique hash string from a go object.
func deepHashObjectToString(objectToWrite interface{}) string {
hasher := md5.New()
hash.DeepHashObject(hasher, objectToWrite)
return hex.EncodeToString(hasher.Sum(nil)[0:])
}
// portMapKey is used to uniquely identify groups of endpoint ports.
type portMapKey string
func newPortMapKey(endpointPorts []discovery.EndpointPort) portMapKey {
sort.Sort(portsInOrder(endpointPorts))
return portMapKey(deepHashObjectToString(endpointPorts))
}
// endpointSliceEndpointLen helps sort endpoint slices by the number of // endpointSliceEndpointLen helps sort endpoint slices by the number of
// endpoints they contain. // endpoints they contain.
type endpointSliceEndpointLen []*discovery.EndpointSlice type endpointSliceEndpointLen []*discovery.EndpointSlice
@ -259,14 +240,3 @@ func (sl endpointSliceEndpointLen) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i]
func (sl endpointSliceEndpointLen) Less(i, j int) bool { func (sl endpointSliceEndpointLen) Less(i, j int) bool {
return len(sl[i].Endpoints) > len(sl[j].Endpoints) return len(sl[i].Endpoints) > len(sl[j].Endpoints)
} }
// portsInOrder helps sort endpoint ports in a consistent way for hashing.
type portsInOrder []discovery.EndpointPort
func (sl portsInOrder) Len() int { return len(sl) }
func (sl portsInOrder) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
func (sl portsInOrder) Less(i, j int) bool {
h1 := deepHashObjectToString(sl[i])
h2 := deepHashObjectToString(sl[j])
return h1 < h2
}

View File

@ -180,7 +180,7 @@ func TestPodToEndpoint(t *testing.T) {
} }
} }
func TestPodChangedWithpodEndpointChanged(t *testing.T) { func TestPodChangedWithPodEndpointChanged(t *testing.T) {
podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
ns := "test" ns := "test"
podStore.Add(newPod(1, ns, true, 1)) podStore.Add(newPod(1, ns, true, 1))
@ -287,7 +287,7 @@ func newClientset() *fake.Clientset {
return client return client
} }
func newServiceAndendpointMeta(name, namespace string) (v1.Service, endpointMeta) { func newServiceAndEndpointMeta(name, namespace string) (v1.Service, endpointMeta) {
portNum := int32(80) portNum := int32(80)
portNameIntStr := intstr.IntOrString{ portNameIntStr := intstr.IntOrString{
Type: intstr.Int, Type: intstr.Int,

View File

@ -11,7 +11,9 @@ go_library(
deps = [ deps = [
"//pkg/api/v1/pod:go_default_library", "//pkg/api/v1/pod:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/util/hash:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",

View File

@ -17,21 +17,42 @@ limitations under the License.
package endpoint package endpoint
import ( import (
"crypto/md5"
"encoding/hex"
"fmt" "fmt"
"reflect" "reflect"
"sort"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
v1listers "k8s.io/client-go/listers/core/v1" v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/hash"
) )
// EndpointsMatch is a type of function that returns true if pod endpoints match. // EndpointsMatch is a type of function that returns true if pod endpoints match.
type EndpointsMatch func(*v1.Pod, *v1.Pod) bool type EndpointsMatch func(*v1.Pod, *v1.Pod) bool
// PortMapKey is used to uniquely identify groups of endpoint ports.
type PortMapKey string
// NewPortMapKey generates a PortMapKey from endpoint ports.
func NewPortMapKey(endpointPorts []discovery.EndpointPort) PortMapKey {
sort.Sort(portsInOrder(endpointPorts))
return PortMapKey(DeepHashObjectToString(endpointPorts))
}
// DeepHashObjectToString creates a unique hash string from a go object.
func DeepHashObjectToString(objectToWrite interface{}) string {
hasher := md5.New()
hash.DeepHashObject(hasher, objectToWrite)
return hex.EncodeToString(hasher.Sum(nil)[0:])
}
// ShouldPodBeInEndpoints returns true if a specified pod should be in an // ShouldPodBeInEndpoints returns true if a specified pod should be in an
// endpoints object. // endpoints object.
func ShouldPodBeInEndpoints(pod *v1.Pod) bool { func ShouldPodBeInEndpoints(pod *v1.Pod) bool {
@ -172,3 +193,14 @@ func determineNeededServiceUpdates(oldServices, services sets.String, podChanged
} }
return services return services
} }
// portsInOrder helps sort endpoint ports in a consistent way for hashing.
type portsInOrder []discovery.EndpointPort
func (sl portsInOrder) Len() int { return len(sl) }
func (sl portsInOrder) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] }
func (sl portsInOrder) Less(i, j int) bool {
h1 := DeepHashObjectToString(sl[i])
h2 := DeepHashObjectToString(sl[j])
return h1 < h2
}