diff --git a/pkg/controller/endpointslice/BUILD b/pkg/controller/endpointslice/BUILD index ebc614d6879..0d47680a4de 100644 --- a/pkg/controller/endpointslice/BUILD +++ b/pkg/controller/endpointslice/BUILD @@ -15,8 +15,8 @@ go_library( "//pkg/apis/core:go_default_library", "//pkg/apis/discovery/validation:go_default_library", "//pkg/controller:go_default_library", + "//pkg/controller/endpointslice/metrics:go_default_library", "//pkg/controller/util/endpoint:go_default_library", - "//pkg/util/hash:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", @@ -24,6 +24,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", @@ -53,6 +54,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/controller:go_default_library", + "//pkg/controller/endpointslice/metrics:go_default_library", "//pkg/controller/util/endpoint:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", @@ -67,6 +69,8 @@ go_test( "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//staging/src/k8s.io/component-base/metrics:go_default_library", + "//vendor/github.com/prometheus/client_model/go:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/utils/pointer:go_default_library", ], @@ -84,6 +88,7 @@ filegroup( srcs = [ ":package-srcs", "//pkg/controller/endpointslice/config:all-srcs", + "//pkg/controller/endpointslice/metrics:all-srcs", ], tags = ["automanaged"], visibility = ["//visibility:public"], diff --git a/pkg/controller/endpointslice/endpointset.go b/pkg/controller/endpointslice/endpointset.go index 604f38ec82c..23566ee5a7d 100644 --- a/pkg/controller/endpointslice/endpointset.go +++ b/pkg/controller/endpointslice/endpointset.go @@ -20,6 +20,7 @@ import ( "sort" discovery "k8s.io/api/discovery/v1alpha1" + endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" ) // endpointHash is used to uniquely identify endpoints. Only including addresses @@ -38,7 +39,7 @@ func hashEndpoint(endpoint *discovery.Endpoint) endpointHash { hashObj.Hostname = *endpoint.Hostname } - return endpointHash(deepHashObjectToString(hashObj)) + return endpointHash(endpointutil.DeepHashObjectToString(hashObj)) } // endpointSet provides simple methods for comparing sets of Endpoints. diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index a6294543ad4..06ea773c600 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -39,6 +39,7 @@ import ( "k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/klog" "k8s.io/kubernetes/pkg/controller" + endpointslicemetrics "k8s.io/kubernetes/pkg/controller/endpointslice/metrics" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" ) @@ -72,6 +73,8 @@ func NewController(podInformer coreinformers.PodInformer, ratelimiter.RegisterMetricAndTrackRateLimiterUsage("endpoint_slice_controller", client.DiscoveryV1alpha1().RESTClient().GetRateLimiter()) } + endpointslicemetrics.RegisterMetrics() + c := &Controller{ client: client, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint_slice"), @@ -108,6 +111,7 @@ func NewController(podInformer coreinformers.PodInformer, client: c.client, nodeLister: c.nodeLister, maxEndpointsPerSlice: c.maxEndpointsPerSlice, + metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice), } c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker() @@ -251,6 +255,7 @@ func (c *Controller) syncService(key string) error { if err != nil { if apierrors.IsNotFound(err) { c.triggerTimeTracker.DeleteService(namespace, name) + c.reconciler.deleteService(namespace, name) // The service has been deleted, return nil so that it won't be retried. return nil } diff --git a/pkg/controller/endpointslice/metrics/BUILD b/pkg/controller/endpointslice/metrics/BUILD new file mode 100644 index 00000000000..ea78d6d6a07 --- /dev/null +++ b/pkg/controller/endpointslice/metrics/BUILD @@ -0,0 +1,42 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "cache.go", + "metrics.go", + ], + importpath = "k8s.io/kubernetes/pkg/controller/endpointslice/metrics", + visibility = ["//visibility:public"], + deps = [ + "//pkg/controller/util/endpoint:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/component-base/metrics:go_default_library", + "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["cache_test.go"], + embed = [":go_default_library"], + deps = [ + "//pkg/controller/util/endpoint:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + ], +) diff --git a/pkg/controller/endpointslice/metrics/cache.go b/pkg/controller/endpointslice/metrics/cache.go new file mode 100644 index 00000000000..16ad6fd1fa7 --- /dev/null +++ b/pkg/controller/endpointslice/metrics/cache.go @@ -0,0 +1,161 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "math" + "sync" + + "k8s.io/apimachinery/pkg/types" + endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" +) + +// NewCache returns a new Cache with the specified endpointsPerSlice. +func NewCache(endpointsPerSlice int32) *Cache { + return &Cache{ + maxEndpointsPerSlice: endpointsPerSlice, + cache: map[types.NamespacedName]*ServicePortCache{}, + } +} + +// Cache tracks values for total numbers of desired endpoints as well as the +// efficiency of EndpointSlice endpoints distribution. +type Cache struct { + // maxEndpointsPerSlice references the maximum number of endpoints that + // should be added to an EndpointSlice. + maxEndpointsPerSlice int32 + + // lock protects changes to numEndpoints and cache. + lock sync.Mutex + // numEndpoints represents the total number of endpoints stored in + // EndpointSlices. + numEndpoints int + // cache stores a ServicePortCache grouped by NamespacedNames representing + // Services. + cache map[types.NamespacedName]*ServicePortCache +} + +// ServicePortCache tracks values for total numbers of desired endpoints as well +// as the efficiency of EndpointSlice endpoints distribution for each unique +// Service Port combination. +type ServicePortCache struct { + items map[endpointutil.PortMapKey]EfficiencyInfo +} + +// EfficiencyInfo stores the number of Endpoints and Slices for calculating +// total numbers of desired endpoints and the efficiency of EndpointSlice +// endpoints distribution. +type EfficiencyInfo struct { + Endpoints int + Slices int +} + +// NewServicePortCache initializes and returns a new ServicePortCache. +func NewServicePortCache() *ServicePortCache { + return &ServicePortCache{ + items: map[endpointutil.PortMapKey]EfficiencyInfo{}, + } +} + +// Set updates the the ServicePortCache to contain the provided EfficiencyInfo +// for the provided PortMapKey. +func (spc *ServicePortCache) Set(pmKey endpointutil.PortMapKey, eInfo EfficiencyInfo) { + spc.items[pmKey] = eInfo +} + +// numEndpoints returns the total number of endpoints represented by a +// ServicePortCache. +func (spc *ServicePortCache) numEndpoints() int { + num := 0 + for _, eInfo := range spc.items { + num += eInfo.Endpoints + } + return num +} + +// UpdateServicePortCache updates a ServicePortCache in the global cache for a +// given Service and updates the corresponding metrics. +// Parameters: +// * serviceNN refers to a NamespacedName representing the Service. +// * spCache refers to a ServicePortCache for the specified Service. +func (c *Cache) UpdateServicePortCache(serviceNN types.NamespacedName, spCache *ServicePortCache) { + c.lock.Lock() + defer c.lock.Unlock() + + prevNumEndpoints := 0 + if existingSPCache, ok := c.cache[serviceNN]; ok { + prevNumEndpoints = existingSPCache.numEndpoints() + } + + currNumEndpoints := spCache.numEndpoints() + // To keep numEndpoints up to date, add the difference between the number of + // endpoints in the provided spCache and any spCache it might be replacing. + c.numEndpoints = c.numEndpoints + currNumEndpoints - prevNumEndpoints + + c.cache[serviceNN] = spCache + c.updateMetrics() +} + +// DeleteService removes references of a Service from the global cache and +// updates the corresponding metrics. +func (c *Cache) DeleteService(serviceNN types.NamespacedName) { + c.lock.Lock() + defer c.lock.Unlock() + + if spCache, ok := c.cache[serviceNN]; ok { + c.numEndpoints = c.numEndpoints - spCache.numEndpoints() + delete(c.cache, serviceNN) + c.updateMetrics() + } +} + +// metricsUpdate stores a desired and actual number of EndpointSlices. +type metricsUpdate struct { + desired, actual int +} + +// desiredAndActualSlices returns a metricsUpdate with the desired and actual +// number of EndpointSlices given the current values in the cache. +// Must be called holding lock. +func (c *Cache) desiredAndActualSlices() metricsUpdate { + mUpdate := metricsUpdate{} + for _, spCache := range c.cache { + for _, eInfo := range spCache.items { + mUpdate.actual += eInfo.Slices + mUpdate.desired += numDesiredSlices(eInfo.Endpoints, int(c.maxEndpointsPerSlice)) + } + } + return mUpdate +} + +// updateMetrics updates metrics with the values from this Cache. +// Must be called holding lock. +func (c *Cache) updateMetrics() { + mUpdate := c.desiredAndActualSlices() + NumEndpointSlices.WithLabelValues().Set(float64(mUpdate.actual)) + DesiredEndpointSlices.WithLabelValues().Set(float64(mUpdate.desired)) + EndpointsDesired.WithLabelValues().Set(float64(c.numEndpoints)) +} + +// numDesiredSlices calculates the number of EndpointSlices that would exist +// with ideal endpoint distribution. +func numDesiredSlices(numEndpoints, maxPerSlice int) int { + if numEndpoints <= maxPerSlice { + return 1 + } + return int(math.Ceil(float64(numEndpoints) / float64(maxPerSlice))) +} diff --git a/pkg/controller/endpointslice/metrics/cache_test.go b/pkg/controller/endpointslice/metrics/cache_test.go new file mode 100644 index 00000000000..8d5d74ef223 --- /dev/null +++ b/pkg/controller/endpointslice/metrics/cache_test.go @@ -0,0 +1,72 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "testing" + + discovery "k8s.io/api/discovery/v1alpha1" + "k8s.io/apimachinery/pkg/types" + endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" +) + +func TestNumEndpointsAndSlices(t *testing.T) { + c := NewCache(int32(100)) + + p80 := int32(80) + p443 := int32(443) + + pmKey80443 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}, {Port: &p443}}) + pmKey80 := endpointutil.NewPortMapKey([]discovery.EndpointPort{{Port: &p80}}) + + spCacheEfficient := NewServicePortCache() + spCacheEfficient.Set(pmKey80, EfficiencyInfo{Endpoints: 45, Slices: 1}) + spCacheEfficient.Set(pmKey80443, EfficiencyInfo{Endpoints: 35, Slices: 1}) + + spCacheInefficient := NewServicePortCache() + spCacheInefficient.Set(pmKey80, EfficiencyInfo{Endpoints: 12, Slices: 5}) + spCacheInefficient.Set(pmKey80443, EfficiencyInfo{Endpoints: 18, Slices: 8}) + + c.UpdateServicePortCache(types.NamespacedName{Namespace: "ns1", Name: "svc1"}, spCacheInefficient) + expectNumEndpointsAndSlices(t, c, 2, 13, 30) + + c.UpdateServicePortCache(types.NamespacedName{Namespace: "ns1", Name: "svc2"}, spCacheEfficient) + expectNumEndpointsAndSlices(t, c, 4, 15, 110) + + c.UpdateServicePortCache(types.NamespacedName{Namespace: "ns1", Name: "svc3"}, spCacheInefficient) + expectNumEndpointsAndSlices(t, c, 6, 28, 140) + + c.UpdateServicePortCache(types.NamespacedName{Namespace: "ns1", Name: "svc1"}, spCacheEfficient) + expectNumEndpointsAndSlices(t, c, 6, 17, 190) + + c.DeleteService(types.NamespacedName{Namespace: "ns1", Name: "svc3"}) + expectNumEndpointsAndSlices(t, c, 4, 4, 160) +} + +func expectNumEndpointsAndSlices(t *testing.T, c *Cache, desired int, actual int, numEndpoints int) { + t.Helper() + mUpdate := c.desiredAndActualSlices() + if mUpdate.desired != desired { + t.Errorf("Expected numEndpointSlices to be %d, got %d", desired, mUpdate.desired) + } + if mUpdate.actual != actual { + t.Errorf("Expected desiredEndpointSlices to be %d, got %d", actual, mUpdate.actual) + } + if c.numEndpoints != numEndpoints { + t.Errorf("Expected numEndpoints to be %d, got %d", numEndpoints, c.numEndpoints) + } +} diff --git a/pkg/controller/endpointslice/metrics/metrics.go b/pkg/controller/endpointslice/metrics/metrics.go new file mode 100644 index 00000000000..f00b5ceabe2 --- /dev/null +++ b/pkg/controller/endpointslice/metrics/metrics.go @@ -0,0 +1,110 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "sync" + + "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +// EndpointSliceSubsystem - subsystem name used for Endpoint Slices. +const EndpointSliceSubsystem = "endpoint_slice_controller" + +var ( + // EndpointsAddedPerSync tracks the number of endpoints added on each + // Service sync. + EndpointsAddedPerSync = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: EndpointSliceSubsystem, + Name: "endpoints_added_per_sync", + Help: "Number of endpoints added on each Service sync", + StabilityLevel: metrics.ALPHA, + Buckets: metrics.ExponentialBuckets(2, 2, 15), + }, + []string{}, + ) + // EndpointsRemovedPerSync tracks the number of endpoints removed on each + // Service sync. + EndpointsRemovedPerSync = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: EndpointSliceSubsystem, + Name: "endpoints_removed_per_sync", + Help: "Number of endpoints removed on each Service sync", + StabilityLevel: metrics.ALPHA, + Buckets: metrics.ExponentialBuckets(2, 2, 15), + }, + []string{}, + ) + // EndpointsDesired tracks the total number of desired endpoints. + EndpointsDesired = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: EndpointSliceSubsystem, + Name: "endpoints_desired", + Help: "Number of endpoints desired", + StabilityLevel: metrics.ALPHA, + }, + []string{}, + ) + // NumEndpointSlices tracks the number of EndpointSlices in a cluster. + NumEndpointSlices = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: EndpointSliceSubsystem, + Name: "num_endpoint_slices", + Help: "Number of EndpointSlices", + StabilityLevel: metrics.ALPHA, + }, + []string{}, + ) + // DesiredEndpointSlices tracks the number of EndpointSlices that would + // exist with perfect endpoint allocation. + DesiredEndpointSlices = metrics.NewGaugeVec( + &metrics.GaugeOpts{ + Subsystem: EndpointSliceSubsystem, + Name: "desired_endpoint_slices", + Help: "Number of EndpointSlices that would exist with perfect endpoint allocation", + StabilityLevel: metrics.ALPHA, + }, + []string{}, + ) + + // EndpointSliceChanges tracks the number of changes to Endpoint Slices. + EndpointSliceChanges = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: EndpointSliceSubsystem, + Name: "changes", + Help: "Number of EndpointSlice changes", + StabilityLevel: metrics.ALPHA, + }, + []string{"operation"}, + ) +) + +var registerMetrics sync.Once + +// RegisterMetrics registers EndpointSlice metrics. +func RegisterMetrics() { + registerMetrics.Do(func() { + legacyregistry.MustRegister(EndpointsAddedPerSync) + legacyregistry.MustRegister(EndpointsRemovedPerSync) + legacyregistry.MustRegister(EndpointsDesired) + legacyregistry.MustRegister(NumEndpointSlices) + legacyregistry.MustRegister(DesiredEndpointSlices) + legacyregistry.MustRegister(EndpointSliceChanges) + }) +} diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index a275896713e..ffcda022f65 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -24,10 +24,12 @@ import ( corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1alpha1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/kubernetes/pkg/controller/endpointslice/metrics" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" ) @@ -37,6 +39,7 @@ type reconciler struct { client clientset.Interface nodeLister corelisters.NodeLister maxEndpointsPerSlice int32 + metricsCache *metrics.Cache } // endpointMeta includes the attributes we group slices on, this type helps with @@ -52,20 +55,23 @@ type endpointMeta struct { // to ensure the desired set of pods are represented by endpoint slices. func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, existingSlices []*discovery.EndpointSlice, triggerTime time.Time) error { // Build data structures for existing state. - existingSlicesByPortMap := map[portMapKey][]*discovery.EndpointSlice{} + existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{} + numExistingEndpoints := 0 for _, existingSlice := range existingSlices { - epHash := newPortMapKey(existingSlice.Ports) + epHash := endpointutil.NewPortMapKey(existingSlice.Ports) existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice) + numExistingEndpoints += len(existingSlice.Endpoints) } // Build data structures for desired state. - desiredMetaByPortMap := map[portMapKey]*endpointMeta{} - desiredEndpointsByPortMap := map[portMapKey]endpointSet{} + desiredMetaByPortMap := map[endpointutil.PortMapKey]*endpointMeta{} + desiredEndpointsByPortMap := map[endpointutil.PortMapKey]endpointSet{} + numDesiredEndpoints := 0 for _, pod := range pods { if endpointutil.ShouldPodBeInEndpoints(pod) { endpointPorts := getEndpointPorts(service, pod) - epHash := newPortMapKey(endpointPorts) + epHash := endpointutil.NewPortMapKey(endpointPorts) if _, ok := desiredEndpointsByPortMap[epHash]; !ok { desiredEndpointsByPortMap[epHash] = endpointSet{} } @@ -85,17 +91,31 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis } endpoint := podToEndpoint(pod, node) desiredEndpointsByPortMap[epHash].Insert(&endpoint) + numDesiredEndpoints++ } } slicesToCreate := []*discovery.EndpointSlice{} slicesToUpdate := []*discovery.EndpointSlice{} sliceNamesToDelete := sets.String{} + spMetrics := metrics.NewServicePortCache() + totalAdded := 0 + totalRemoved := 0 // Determine changes necessary for each group of slices by port map. for portMap, desiredEndpoints := range desiredEndpointsByPortMap { - pmSlicesToCreate, pmSlicesToUpdate, pmSliceNamesToDelete := r.reconcileByPortMapping( + numEndpoints := len(desiredEndpoints) + pmSlicesToCreate, pmSlicesToUpdate, pmSliceNamesToDelete, added, removed := r.reconcileByPortMapping( service, existingSlicesByPortMap[portMap], desiredEndpoints, desiredMetaByPortMap[portMap]) + + totalAdded += added + totalRemoved += removed + + spMetrics.Set(portMap, metrics.EfficiencyInfo{ + Endpoints: numEndpoints, + Slices: len(existingSlicesByPortMap[portMap]) + len(pmSlicesToCreate) - len(pmSliceNamesToDelete), + }) + if len(pmSlicesToCreate) > 0 { slicesToCreate = append(slicesToCreate, pmSlicesToCreate...) } @@ -121,8 +141,18 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis if len(existingSlices) == sliceNamesToDelete.Len() && len(slicesToCreate) < 1 { placeholderSlice := newEndpointSlice(service, &endpointMeta{Ports: []discovery.EndpointPort{}}) slicesToCreate = append(slicesToCreate, placeholderSlice) + spMetrics.Set(endpointutil.NewPortMapKey(placeholderSlice.Ports), metrics.EfficiencyInfo{ + Endpoints: 0, + Slices: 1, + }) } + metrics.EndpointsAddedPerSync.WithLabelValues().Observe(float64(totalAdded)) + metrics.EndpointsRemovedPerSync.WithLabelValues().Observe(float64(totalRemoved)) + + serviceNN := types.NamespacedName{Name: service.Name, Namespace: service.Namespace} + r.metricsCache.UpdateServicePortCache(serviceNN, spMetrics) + return r.finalize(service, slicesToCreate, slicesToUpdate, sliceNamesToDelete, triggerTime) } @@ -151,6 +181,8 @@ func (r *reconciler) finalize( _, err := r.client.DiscoveryV1alpha1().EndpointSlices(service.Namespace).Create(endpointSlice) if err != nil { errs = append(errs, fmt.Errorf("Error creating EndpointSlice for Service %s/%s: %v", service.Namespace, service.Name, err)) + } else { + metrics.EndpointSliceChanges.WithLabelValues("create").Inc() } } @@ -159,6 +191,8 @@ func (r *reconciler) finalize( _, err := r.client.DiscoveryV1alpha1().EndpointSlices(service.Namespace).Update(endpointSlice) if err != nil { errs = append(errs, fmt.Errorf("Error updating %s EndpointSlice for Service %s/%s: %v", endpointSlice.Name, service.Namespace, service.Name, err)) + } else { + metrics.EndpointSliceChanges.WithLabelValues("update").Inc() } } @@ -167,6 +201,8 @@ func (r *reconciler) finalize( err := r.client.DiscoveryV1alpha1().EndpointSlices(service.Namespace).Delete(sliceName, &metav1.DeleteOptions{}) if err != nil { errs = append(errs, fmt.Errorf("Error deleting %s EndpointSlice for Service %s/%s: %v", sliceName, service.Namespace, service.Name, err)) + } else { + metrics.EndpointSliceChanges.WithLabelValues("delete").Inc() } } @@ -187,11 +223,12 @@ func (r *reconciler) reconcileByPortMapping( existingSlices []*discovery.EndpointSlice, desiredSet endpointSet, endpointMeta *endpointMeta, -) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice, sets.String) { +) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice, sets.String, int, int) { slicesByName := map[string]*discovery.EndpointSlice{} sliceNamesUnchanged := sets.String{} sliceNamesToUpdate := sets.String{} sliceNamesToDelete := sets.String{} + numRemoved := 0 // 1. Iterate through existing slices to delete endpoints no longer desired // and update endpoints that have changed @@ -217,6 +254,9 @@ func (r *reconciler) reconcileByPortMapping( // If an endpoint was updated or removed, mark for update or delete if endpointUpdated || len(existingSlice.Endpoints) != len(newEndpoints) { + if len(existingSlice.Endpoints) > len(newEndpoints) { + numRemoved += len(existingSlice.Endpoints) - len(newEndpoints) + } if len(newEndpoints) == 0 { // if no endpoints desired in this slice, mark for deletion sliceNamesToDelete.Insert(existingSlice.Name) @@ -231,6 +271,8 @@ func (r *reconciler) reconcileByPortMapping( } } + numAdded := desiredSet.Len() + // 2. If we still have desired endpoints to add and slices marked for update, // iterate through the slices and fill them up with the desired endpoints. if desiredSet.Len() > 0 && sliceNamesToUpdate.Len() > 0 { @@ -297,5 +339,9 @@ func (r *reconciler) reconcileByPortMapping( slicesToUpdate = append(slicesToUpdate, slicesByName[sliceName]) } - return slicesToCreate, slicesToUpdate, sliceNamesToDelete + return slicesToCreate, slicesToUpdate, sliceNamesToDelete, numAdded, numRemoved +} + +func (r *reconciler) deleteService(namespace, name string) { + r.metricsCache.DeleteService(types.NamespacedName{Namespace: namespace, Name: name}) } diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index 031f18b0bbd..a1add9d58a0 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -17,9 +17,11 @@ limitations under the License. package endpointslice import ( + "fmt" "testing" "time" + dto "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" @@ -31,7 +33,9 @@ import ( "k8s.io/client-go/kubernetes/fake" corelisters "k8s.io/client-go/listers/core/v1" k8stesting "k8s.io/client-go/testing" + compmetrics "k8s.io/component-base/metrics" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/endpointslice/metrics" utilpointer "k8s.io/utils/pointer" ) @@ -40,8 +44,9 @@ var defaultMaxEndpointsPerSlice = int32(100) // Even when there are no pods, we want to have a placeholder slice for each service func TestReconcileEmpty(t *testing.T) { client := newClientset() + setupMetrics() namespace := "test" - svc, _ := newServiceAndendpointMeta("foo", namespace) + svc, _ := newServiceAndEndpointMeta("foo", namespace) r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice) reconcileHelper(t, r, &svc, []*corev1.Pod{}, []*discovery.EndpointSlice{}, time.Now()) @@ -54,14 +59,16 @@ func TestReconcileEmpty(t *testing.T) { assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName]) assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports) assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints) + expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0}) } // Given a single pod matching a service selector and no existing endpoint slices, // a slice should be created func TestReconcile1Pod(t *testing.T) { client := newClientset() + setupMetrics() namespace := "test" - svc, _ := newServiceAndendpointMeta("foo", namespace) + svc, _ := newServiceAndEndpointMeta("foo", namespace) pod1 := newPod(1, namespace, true, 1) pod1.Spec.Hostname = "example-hostname" node1 := &corev1.Node{ @@ -100,14 +107,16 @@ func TestReconcile1Pod(t *testing.T) { Name: "pod1", }, }}, slices[0].Endpoints) + expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 1, addedPerSync: 1, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0}) } // given an existing endpoint slice and no pods matching the service, the existing // slice should be updated to a placeholder (not deleted) func TestReconcile1EndpointSlice(t *testing.T) { client := newClientset() + setupMetrics() namespace := "test" - svc, endpointMeta := newServiceAndendpointMeta("foo", namespace) + svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace) endpointSlice1 := newEmptyEndpointSlice(1, namespace, endpointMeta, svc) _, createErr := client.DiscoveryV1alpha1().EndpointSlices(namespace).Create(endpointSlice1) @@ -127,14 +136,16 @@ func TestReconcile1EndpointSlice(t *testing.T) { assert.Equal(t, svc.Name, slices[0].Labels[discovery.LabelServiceName]) assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports) assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints) + expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 1, numDeleted: 0}) } // a simple use case with 250 pods matching a service and no existing slices // reconcile should create 3 slices, completely filling 2 of them func TestReconcileManyPods(t *testing.T) { client := newClientset() + setupMetrics() namespace := "test" - svc, _ := newServiceAndendpointMeta("foo", namespace) + svc, _ := newServiceAndEndpointMeta("foo", namespace) // start with 250 pods pods := []*corev1.Pod{} @@ -152,6 +163,7 @@ func TestReconcileManyPods(t *testing.T) { // Two endpoint slices should be completely full, the remainder should be in another one expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 50}) + expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 250, removedPerSync: 0, numCreated: 3, numUpdated: 0, numDeleted: 0}) } // now with preexisting slices, we have 250 pods matching a service @@ -164,8 +176,9 @@ func TestReconcileManyPods(t *testing.T) { // this approach requires 1 update + 1 create instead of 2 updates + 1 create func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) { client := newClientset() + setupMetrics() namespace := "test" - svc, endpointMeta := newServiceAndendpointMeta("foo", namespace) + svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace) // start with 250 pods pods := []*corev1.Pod{} @@ -200,6 +213,7 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) { // 1 new slice (0->100) + 1 updated slice (62->89) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{89, 61, 100}) + expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 127, removedPerSync: 0, numCreated: 1, numUpdated: 1, numDeleted: 0}) } // now with preexisting slices, we have 300 pods matching a service @@ -214,8 +228,9 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) { // this approach requires 2 creates instead of 2 updates + 1 create func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) { client := newClientset() + setupMetrics() namespace := "test" - svc, endpointMeta := newServiceAndendpointMeta("foo", namespace) + svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace) // start with 300 pods pods := []*corev1.Pod{} @@ -249,6 +264,7 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) { // 2 new slices (100, 52) in addition to existing slices (74, 74) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{74, 74, 100, 52}) + expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 4, desiredEndpoints: 300, addedPerSync: 152, removedPerSync: 0, numCreated: 2, numUpdated: 0, numDeleted: 0}) } // In some cases, such as a service port change, all slices for that service will require a change @@ -256,7 +272,7 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) { func TestReconcileEndpointSlicesUpdating(t *testing.T) { client := newClientset() namespace := "test" - svc, _ := newServiceAndendpointMeta("foo", namespace) + svc, _ := newServiceAndEndpointMeta("foo", namespace) // start with 250 pods pods := []*corev1.Pod{} @@ -290,8 +306,9 @@ func TestReconcileEndpointSlicesUpdating(t *testing.T) { // reconcile repacks the endpoints into 3 slices, and deletes the extras func TestReconcileEndpointSlicesRecycling(t *testing.T) { client := newClientset() + setupMetrics() namespace := "test" - svc, endpointMeta := newServiceAndendpointMeta("foo", namespace) + svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace) // start with 300 pods pods := []*corev1.Pod{} @@ -327,6 +344,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) { // thanks to recycling, we get a free repack of endpoints, resulting in 3 full slices instead of 10 mostly empty slices expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 100}) + expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 0, numUpdated: 3, numDeleted: 7}) } // In this test, we want to verify that endpoints are added to a slice that will @@ -334,8 +352,9 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) { // for update. func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) { client := newClientset() + setupMetrics() namespace := "test" - svc, endpointMeta := newServiceAndendpointMeta("foo", namespace) + svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace) existingSlices := []*discovery.EndpointSlice{} pods := []*corev1.Pod{} @@ -378,6 +397,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) { // ensure that both endpoint slices have been updated expectActions(t, client.Actions(), 2, "update", "endpointslices") + expectMetrics(t, expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 115, addedPerSync: 15, removedPerSync: 0, numCreated: 0, numUpdated: 2, numDeleted: 0}) // additional pods should get added to fuller slice expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{95, 20}) @@ -387,6 +407,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) { // This test ensures that EndpointSlices are grouped correctly in that case. func TestReconcileEndpointSlicesNamedPorts(t *testing.T) { client := newClientset() + setupMetrics() namespace := "test" portNameIntStr := intstr.IntOrString{ @@ -425,6 +446,7 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) { // reconcile should create 5 endpoint slices assert.Equal(t, 5, len(client.Actions()), "Expected 5 client actions as part of reconcile") expectActions(t, client.Actions(), 5, "create", "endpointslices") + expectMetrics(t, expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 5, numUpdated: 0, numDeleted: 0}) fetchedSlices := fetchEndpointSlices(t, client, namespace) @@ -454,7 +476,7 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) { // appropriate endpoints distribution among slices func TestReconcileMaxEndpointsPerSlice(t *testing.T) { namespace := "test" - svc, _ := newServiceAndendpointMeta("foo", namespace) + svc, _ := newServiceAndEndpointMeta("foo", namespace) // start with 250 pods pods := []*corev1.Pod{} @@ -466,33 +488,69 @@ func TestReconcileMaxEndpointsPerSlice(t *testing.T) { testCases := []struct { maxEndpointsPerSlice int32 expectedSliceLengths []int + expectedMetricValues expectedMetrics }{ { maxEndpointsPerSlice: int32(50), expectedSliceLengths: []int{50, 50, 50, 50, 50}, + expectedMetricValues: expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 250, addedPerSync: 250, numCreated: 5}, }, { maxEndpointsPerSlice: int32(80), expectedSliceLengths: []int{80, 80, 80, 10}, + expectedMetricValues: expectedMetrics{desiredSlices: 4, actualSlices: 4, desiredEndpoints: 250, addedPerSync: 250, numCreated: 4}, }, { maxEndpointsPerSlice: int32(150), expectedSliceLengths: []int{150, 100}, + expectedMetricValues: expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 250, addedPerSync: 250, numCreated: 2}, }, { maxEndpointsPerSlice: int32(250), expectedSliceLengths: []int{250}, + expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1}, }, { maxEndpointsPerSlice: int32(500), expectedSliceLengths: []int{250}, + expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1}, }, } for _, testCase := range testCases { - client := newClientset() - r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, testCase.maxEndpointsPerSlice) - reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now()) - expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), testCase.expectedSliceLengths) + t.Run(fmt.Sprintf("maxEndpointsPerSlice: %d", testCase.maxEndpointsPerSlice), func(t *testing.T) { + client := newClientset() + setupMetrics() + r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, testCase.maxEndpointsPerSlice) + reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now()) + expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), testCase.expectedSliceLengths) + expectMetrics(t, testCase.expectedMetricValues) + }) } } +func TestReconcileEndpointSlicesMetrics(t *testing.T) { + client := newClientset() + setupMetrics() + namespace := "test" + svc, _ := newServiceAndEndpointMeta("foo", namespace) + + // start with 20 pods + pods := []*corev1.Pod{} + for i := 0; i < 20; i++ { + pods = append(pods, newPod(i, namespace, true, 1)) + } + + r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice) + reconcileHelper(t, r, &svc, pods, []*discovery.EndpointSlice{}, time.Now()) + + actions := client.Actions() + assert.Equal(t, 1, len(actions), "Expected 1 additional client actions as part of reconcile") + assert.True(t, actions[0].Matches("create", "endpointslices"), "First action should be create endpoint slice") + + expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 20, addedPerSync: 20, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0}) + + fetchedSlices := fetchEndpointSlices(t, client, namespace) + reconcileHelper(t, r, &svc, pods[0:10], []*discovery.EndpointSlice{&fetchedSlices[0]}, time.Now()) + expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 10, addedPerSync: 20, removedPerSync: 10, numCreated: 1, numUpdated: 1, numDeleted: 0}) +} + // Test Helpers func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *reconciler { @@ -507,6 +565,7 @@ func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPer client: client, nodeLister: corelisters.NewNodeLister(indexer), maxEndpointsPerSlice: maxEndpointsPerSlice, + metricsCache: metrics.NewCache(maxEndpointsPerSlice), } } @@ -604,3 +663,99 @@ func reconcileHelper(t *testing.T, r *reconciler, service *corev1.Service, pods t.Fatalf("Expected no error reconciling Endpoint Slices, got: %v", err) } } + +// Metrics helpers + +type expectedMetrics struct { + desiredSlices int + actualSlices int + desiredEndpoints int + addedPerSync int + removedPerSync int + numCreated int + numUpdated int + numDeleted int +} + +func expectMetrics(t *testing.T, em expectedMetrics) { + t.Helper() + + actualDesiredSlices := getGaugeMetricValue(t, metrics.DesiredEndpointSlices.WithLabelValues()) + if actualDesiredSlices != float64(em.desiredSlices) { + t.Errorf("Expected desiredEndpointSlices to be %d, got %v", em.desiredSlices, actualDesiredSlices) + } + + actualNumSlices := getGaugeMetricValue(t, metrics.NumEndpointSlices.WithLabelValues()) + if actualDesiredSlices != float64(em.desiredSlices) { + t.Errorf("Expected numEndpointSlices to be %d, got %v", em.actualSlices, actualNumSlices) + } + + actualEndpointsDesired := getGaugeMetricValue(t, metrics.EndpointsDesired.WithLabelValues()) + if actualEndpointsDesired != float64(em.desiredEndpoints) { + t.Errorf("Expected desiredEndpoints to be %d, got %v", em.desiredEndpoints, actualEndpointsDesired) + } + + actualAddedPerSync := getHistogramMetricValue(t, metrics.EndpointsAddedPerSync.WithLabelValues()) + if actualAddedPerSync != float64(em.addedPerSync) { + t.Errorf("Expected endpointsAddedPerSync to be %d, got %v", em.addedPerSync, actualAddedPerSync) + } + + actualRemovedPerSync := getHistogramMetricValue(t, metrics.EndpointsRemovedPerSync.WithLabelValues()) + if actualRemovedPerSync != float64(em.removedPerSync) { + t.Errorf("Expected endpointsRemovedPerSync to be %d, got %v", em.removedPerSync, actualRemovedPerSync) + } + + actualCreated := getCounterMetricValue(t, metrics.EndpointSliceChanges.WithLabelValues("create")) + if actualCreated != float64(em.numCreated) { + t.Errorf("Expected endpointSliceChangesCreated to be %d, got %v", em.numCreated, actualCreated) + } + + actualUpdated := getCounterMetricValue(t, metrics.EndpointSliceChanges.WithLabelValues("update")) + if actualUpdated != float64(em.numUpdated) { + t.Errorf("Expected endpointSliceChangesUpdated to be %d, got %v", em.numUpdated, actualUpdated) + } + + actualDeleted := getCounterMetricValue(t, metrics.EndpointSliceChanges.WithLabelValues("delete")) + if actualDeleted != float64(em.numDeleted) { + t.Errorf("Expected endpointSliceChangesDeleted to be %d, got %v", em.numDeleted, actualDeleted) + } +} + +func setupMetrics() { + metrics.RegisterMetrics() + metrics.NumEndpointSlices.Delete(map[string]string{}) + metrics.DesiredEndpointSlices.Delete(map[string]string{}) + metrics.EndpointsDesired.Delete(map[string]string{}) + metrics.EndpointsAddedPerSync.Delete(map[string]string{}) + metrics.EndpointsRemovedPerSync.Delete(map[string]string{}) + metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "create"}) + metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "update"}) + metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "delete"}) +} + +func getGaugeMetricValue(t *testing.T, metric compmetrics.GaugeMetric) float64 { + t.Helper() + metricProto := &dto.Metric{} + if err := metric.Write(metricProto); err != nil { + t.Errorf("Error writing metric: %v", err) + } + return metricProto.Gauge.GetValue() +} + +func getCounterMetricValue(t *testing.T, metric compmetrics.CounterMetric) float64 { + t.Helper() + metricProto := &dto.Metric{} + if err := metric.(compmetrics.Metric).Write(metricProto); err != nil { + t.Errorf("Error writing metric: %v", err) + } + return metricProto.Counter.GetValue() +} + +func getHistogramMetricValue(t *testing.T, metric compmetrics.ObserverMetric) float64 { + t.Helper() + metricProto := &dto.Metric{} + if err := metric.(compmetrics.Metric).Write(metricProto); err != nil { + t.Errorf("Error writing metric: %v", err) + } + return metricProto.Histogram.GetSampleSum() +} diff --git a/pkg/controller/endpointslice/utils.go b/pkg/controller/endpointslice/utils.go index 7de30bd2260..91b1930de66 100644 --- a/pkg/controller/endpointslice/utils.go +++ b/pkg/controller/endpointslice/utils.go @@ -17,11 +17,8 @@ limitations under the License. package endpointslice import ( - "crypto/md5" - "encoding/hex" "fmt" "reflect" - "sort" "time" corev1 "k8s.io/api/core/v1" @@ -33,7 +30,6 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/discovery/validation" - "k8s.io/kubernetes/pkg/util/hash" ) // podEndpointChanged returns true if the results of podToEndpoint are different @@ -235,21 +231,6 @@ func addTriggerTimeAnnotation(endpointSlice *discovery.EndpointSlice, triggerTim } } -// deepHashObject creates a unique hash string from a go object. -func deepHashObjectToString(objectToWrite interface{}) string { - hasher := md5.New() - hash.DeepHashObject(hasher, objectToWrite) - return hex.EncodeToString(hasher.Sum(nil)[0:]) -} - -// portMapKey is used to uniquely identify groups of endpoint ports. -type portMapKey string - -func newPortMapKey(endpointPorts []discovery.EndpointPort) portMapKey { - sort.Sort(portsInOrder(endpointPorts)) - return portMapKey(deepHashObjectToString(endpointPorts)) -} - // endpointSliceEndpointLen helps sort endpoint slices by the number of // endpoints they contain. type endpointSliceEndpointLen []*discovery.EndpointSlice @@ -259,14 +240,3 @@ func (sl endpointSliceEndpointLen) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] func (sl endpointSliceEndpointLen) Less(i, j int) bool { return len(sl[i].Endpoints) > len(sl[j].Endpoints) } - -// portsInOrder helps sort endpoint ports in a consistent way for hashing. -type portsInOrder []discovery.EndpointPort - -func (sl portsInOrder) Len() int { return len(sl) } -func (sl portsInOrder) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] } -func (sl portsInOrder) Less(i, j int) bool { - h1 := deepHashObjectToString(sl[i]) - h2 := deepHashObjectToString(sl[j]) - return h1 < h2 -} diff --git a/pkg/controller/endpointslice/utils_test.go b/pkg/controller/endpointslice/utils_test.go index da482f77d9d..1ac569df11f 100644 --- a/pkg/controller/endpointslice/utils_test.go +++ b/pkg/controller/endpointslice/utils_test.go @@ -180,7 +180,7 @@ func TestPodToEndpoint(t *testing.T) { } } -func TestPodChangedWithpodEndpointChanged(t *testing.T) { +func TestPodChangedWithPodEndpointChanged(t *testing.T) { podStore := cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) ns := "test" podStore.Add(newPod(1, ns, true, 1)) @@ -287,7 +287,7 @@ func newClientset() *fake.Clientset { return client } -func newServiceAndendpointMeta(name, namespace string) (v1.Service, endpointMeta) { +func newServiceAndEndpointMeta(name, namespace string) (v1.Service, endpointMeta) { portNum := int32(80) portNameIntStr := intstr.IntOrString{ Type: intstr.Int, diff --git a/pkg/controller/util/endpoint/BUILD b/pkg/controller/util/endpoint/BUILD index e400de4e421..c7c3c19a528 100644 --- a/pkg/controller/util/endpoint/BUILD +++ b/pkg/controller/util/endpoint/BUILD @@ -11,7 +11,9 @@ go_library( deps = [ "//pkg/api/v1/pod:go_default_library", "//pkg/controller:go_default_library", + "//pkg/util/hash:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", diff --git a/pkg/controller/util/endpoint/controller_utils.go b/pkg/controller/util/endpoint/controller_utils.go index 38f70a16bb2..07d3f3e1cca 100644 --- a/pkg/controller/util/endpoint/controller_utils.go +++ b/pkg/controller/util/endpoint/controller_utils.go @@ -17,21 +17,42 @@ limitations under the License. package endpoint import ( + "crypto/md5" + "encoding/hex" "fmt" "reflect" + "sort" v1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" v1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/util/hash" ) // EndpointsMatch is a type of function that returns true if pod endpoints match. type EndpointsMatch func(*v1.Pod, *v1.Pod) bool +// PortMapKey is used to uniquely identify groups of endpoint ports. +type PortMapKey string + +// NewPortMapKey generates a PortMapKey from endpoint ports. +func NewPortMapKey(endpointPorts []discovery.EndpointPort) PortMapKey { + sort.Sort(portsInOrder(endpointPorts)) + return PortMapKey(DeepHashObjectToString(endpointPorts)) +} + +// DeepHashObjectToString creates a unique hash string from a go object. +func DeepHashObjectToString(objectToWrite interface{}) string { + hasher := md5.New() + hash.DeepHashObject(hasher, objectToWrite) + return hex.EncodeToString(hasher.Sum(nil)[0:]) +} + // ShouldPodBeInEndpoints returns true if a specified pod should be in an // endpoints object. func ShouldPodBeInEndpoints(pod *v1.Pod) bool { @@ -172,3 +193,14 @@ func determineNeededServiceUpdates(oldServices, services sets.String, podChanged } return services } + +// portsInOrder helps sort endpoint ports in a consistent way for hashing. +type portsInOrder []discovery.EndpointPort + +func (sl portsInOrder) Len() int { return len(sl) } +func (sl portsInOrder) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] } +func (sl portsInOrder) Less(i, j int) bool { + h1 := DeepHashObjectToString(sl[i]) + h2 := DeepHashObjectToString(sl[j]) + return h1 < h2 +}