diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index b5009fa0429..b570455a7f4 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" coreinformers "k8s.io/client-go/informers/core/v1" discoveryinformers "k8s.io/client-go/informers/discovery/v1" clientset "k8s.io/client-go/kubernetes" @@ -42,7 +43,9 @@ import ( "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" endpointslicemetrics "k8s.io/kubernetes/pkg/controller/endpointslice/metrics" + "k8s.io/kubernetes/pkg/controller/endpointslice/topologycache" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" + "k8s.io/kubernetes/pkg/features" ) const ( @@ -142,13 +145,6 @@ func NewController(podInformer coreinformers.PodInformer, c.maxEndpointsPerSlice = maxEndpointsPerSlice - c.reconciler = &reconciler{ - client: c.client, - nodeLister: c.nodeLister, - maxEndpointsPerSlice: c.maxEndpointsPerSlice, - endpointSliceTracker: c.endpointSliceTracker, - metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice), - } c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker() c.eventBroadcaster = broadcaster @@ -157,6 +153,25 @@ func NewController(podInformer coreinformers.PodInformer, c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod c.serviceSelectorCache = endpointutil.NewServiceSelectorCache() + if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { + nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.addNode, + UpdateFunc: c.updateNode, + DeleteFunc: c.deleteNode, + }) + + c.topologyCache = topologycache.NewTopologyCache() + } + + c.reconciler = &reconciler{ + client: c.client, + nodeLister: c.nodeLister, + maxEndpointsPerSlice: c.maxEndpointsPerSlice, + endpointSliceTracker: c.endpointSliceTracker, + metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice), + topologyCache: c.topologyCache, + } + return c } @@ -227,6 +242,10 @@ type Controller struct { // serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls // to AsSelectorPreValidated (see #73527) serviceSelectorCache *endpointutil.ServiceSelectorCache + + // topologyCache tracks the distribution of Nodes and endpoints across zones + // to enable TopologyAwareHints. + topologyCache *topologycache.TopologyCache } // Run will not return until stopCh is closed. @@ -275,6 +294,8 @@ func (c *Controller) processNextWorkItem() bool { } func (c *Controller) handleErr(err error, key interface{}) { + trackSync(err) + if err == nil { c.queue.Forget(key) return @@ -490,3 +511,50 @@ func (c *Controller) deletePod(obj interface{}) { c.addPod(pod) } } + +func (c *Controller) addNode(obj interface{}) { + c.checkNodeTopologyDistribution() +} + +func (c *Controller) updateNode(old, cur interface{}) { + oldNode := old.(*v1.Node) + curNode := cur.(*v1.Node) + + if topologycache.NodeReady(oldNode.Status) != topologycache.NodeReady(curNode.Status) { + c.checkNodeTopologyDistribution() + } +} + +func (c *Controller) deleteNode(obj interface{}) { + c.checkNodeTopologyDistribution() +} + +// checkNodeTopologyDistribution updates Nodes in the topology cache and then +// queues any Services that are past the threshold. +func (c *Controller) checkNodeTopologyDistribution() { + if c.topologyCache == nil { + return + } + nodes, err := c.nodeLister.List(labels.Everything()) + if err != nil { + klog.Errorf("Error listing Nodes: %v", err) + } + c.topologyCache.SetNodes(nodes) + serviceKeys := c.topologyCache.GetOverloadedServices() + for _, serviceKey := range serviceKeys { + c.queue.Add(serviceKey) + } +} + +// trackSync increments the EndpointSliceSyncs metric with the result of a sync. +func trackSync(err error) { + metricLabel := "success" + if err != nil { + if isStaleInformerCacheErr(err) { + metricLabel = "stale" + } else { + metricLabel = "error" + } + } + endpointslicemetrics.EndpointSliceSyncs.WithLabelValues(metricLabel).Inc() +} diff --git a/pkg/controller/endpointslice/endpointslice_controller_test.go b/pkg/controller/endpointslice/endpointslice_controller_test.go index a2b09e40f5d..a8fda60bba3 100644 --- a/pkg/controller/endpointslice/endpointslice_controller_test.go +++ b/pkg/controller/endpointslice/endpointslice_controller_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -40,6 +41,7 @@ import ( "k8s.io/client-go/tools/cache" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/endpointslice/topologycache" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" "k8s.io/kubernetes/pkg/features" utilpointer "k8s.io/utils/pointer" @@ -1501,6 +1503,183 @@ func TestSyncServiceStaleInformer(t *testing.T) { } } +func Test_checkNodeTopologyDistribution(t *testing.T) { + zoneA := "zone-a" + zoneB := "zone-b" + zoneC := "zone-c" + + readyTrue := true + readyFalse := false + + cpu100 := resource.MustParse("100m") + cpu1000 := resource.MustParse("1000m") + cpu2000 := resource.MustParse("2000m") + + type nodeInfo struct { + zoneLabel *string + ready *bool + cpu *resource.Quantity + } + + testCases := []struct { + name string + nodes []nodeInfo + topologyCacheEnabled bool + endpointZoneInfo map[string]topologycache.EndpointZoneInfo + expectedQueueLen int + }{{ + name: "empty", + nodes: []nodeInfo{}, + topologyCacheEnabled: false, + endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{}, + expectedQueueLen: 0, + }, { + name: "lopsided, queue required", + nodes: []nodeInfo{ + {zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu100}, + {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000}, + {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, + }, + topologyCacheEnabled: true, + endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ + "ns/svc1": {zoneA: 1, zoneB: 2, zoneC: 3}, + }, + expectedQueueLen: 1, + }, { + name: "lopsided but 1 unready, queue required because unready node means 0 CPU in one zone", + nodes: []nodeInfo{ + {zoneLabel: &zoneA, ready: &readyFalse, cpu: &cpu100}, + {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000}, + {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, + }, + topologyCacheEnabled: true, + endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ + "ns/svc1": {zoneA: 1, zoneB: 2, zoneC: 3}, + }, + expectedQueueLen: 1, + }, { + name: "even zones, uneven endpoint distribution but within threshold, no sync required", + nodes: []nodeInfo{ + {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000}, + {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000}, + {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, + {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, + }, + topologyCacheEnabled: true, + endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ + "ns/svc1": {zoneB: 5, zoneC: 4}, + }, + expectedQueueLen: 0, + }, { + name: "even zones but node missing zone, sync required", + nodes: []nodeInfo{ + {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000}, + {ready: &readyTrue, cpu: &cpu2000}, + {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, + {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, + }, + topologyCacheEnabled: true, + endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ + "ns/svc1": {zoneB: 5, zoneC: 4}, + }, + expectedQueueLen: 1, + }, { + name: "even zones but node missing cpu, sync required", + nodes: []nodeInfo{ + {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000}, + {zoneLabel: &zoneB, ready: &readyTrue}, + {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, + {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, + }, + topologyCacheEnabled: true, + endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ + "ns/svc1": {zoneB: 5, zoneC: 4}, + }, + expectedQueueLen: 1, + }, { + name: "even zones, uneven endpoint distribution beyond threshold, no sync required", + nodes: []nodeInfo{ + {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000}, + {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000}, + {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, + {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000}, + }, + topologyCacheEnabled: true, + endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ + "ns/svc1": {zoneB: 6, zoneC: 4}, + }, + expectedQueueLen: 1, + }, { + name: "3 uneven zones, matching endpoint distribution, no sync required", + nodes: []nodeInfo{ + {zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu2000}, + {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000}, + {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu100}, + }, + topologyCacheEnabled: true, + endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ + "ns/svc1": {zoneA: 20, zoneB: 10, zoneC: 1}, + }, + expectedQueueLen: 0, + }, { + name: "3 uneven zones, endpoint distribution within threshold but below 1, sync required", + nodes: []nodeInfo{ + {zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu2000}, + {zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000}, + {zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu100}, + }, + topologyCacheEnabled: true, + endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{ + "ns/svc1": {zoneA: 20, zoneB: 10, zoneC: 0}, + }, + expectedQueueLen: 1, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, esController := newController([]string{}, time.Duration(0)) + + for i, nodeInfo := range tc.nodes { + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("node-%d", i)}, + Status: v1.NodeStatus{}, + } + if nodeInfo.zoneLabel != nil { + node.Labels = map[string]string{v1.LabelTopologyZone: *nodeInfo.zoneLabel} + } + if nodeInfo.ready != nil { + status := v1.ConditionFalse + if *nodeInfo.ready { + status = v1.ConditionTrue + } + node.Status.Conditions = []v1.NodeCondition{{ + Type: v1.NodeReady, + Status: status, + }} + } + if nodeInfo.cpu != nil { + node.Status.Allocatable = v1.ResourceList{ + v1.ResourceCPU: *nodeInfo.cpu, + } + } + esController.nodeStore.Add(node) + if tc.topologyCacheEnabled { + esController.topologyCache = topologycache.NewTopologyCache() + for serviceKey, endpointZoneInfo := range tc.endpointZoneInfo { + esController.topologyCache.SetHints(serviceKey, discovery.AddressTypeIPv4, endpointZoneInfo) + } + } + } + + esController.checkNodeTopologyDistribution() + + if esController.queue.Len() != tc.expectedQueueLen { + t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.queue.Len()) + } + }) + } +} + // Test helpers func addPods(t *testing.T, esController *endpointSliceController, namespace string, podsCount int) { t.Helper() diff --git a/pkg/controller/endpointslice/metrics/metrics.go b/pkg/controller/endpointslice/metrics/metrics.go index f00b5ceabe2..90d0fbb8f4f 100644 --- a/pkg/controller/endpointslice/metrics/metrics.go +++ b/pkg/controller/endpointslice/metrics/metrics.go @@ -93,6 +93,29 @@ var ( }, []string{"operation"}, ) + + // EndpointSlicesChangedPerSync observes the number of EndpointSlices + // changed per sync. + EndpointSlicesChangedPerSync = metrics.NewHistogramVec( + &metrics.HistogramOpts{ + Subsystem: EndpointSliceSubsystem, + Name: "endpointslices_changed_per_sync", + Help: "Number of EndpointSlices changed on each Service sync", + }, + []string{"topology"}, // either "auto" or "disabled" + ) + + // EndpointSliceSyncs tracks the number of sync operations the controller + // runs along with their result. + EndpointSliceSyncs = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: EndpointSliceSubsystem, + Name: "syncs", + Help: "Number of EndpointSlice syncs", + StabilityLevel: metrics.ALPHA, + }, + []string{"result"}, // either "success", "stale", or "error" + ) ) var registerMetrics sync.Once @@ -106,5 +129,7 @@ func RegisterMetrics() { legacyregistry.MustRegister(NumEndpointSlices) legacyregistry.MustRegister(DesiredEndpointSlices) legacyregistry.MustRegister(EndpointSliceChanges) + legacyregistry.MustRegister(EndpointSlicesChangedPerSync) + legacyregistry.MustRegister(EndpointSliceSyncs) }) } diff --git a/pkg/controller/endpointslice/reconciler.go b/pkg/controller/endpointslice/reconciler.go index 88fcac6b091..2767af5497a 100644 --- a/pkg/controller/endpointslice/reconciler.go +++ b/pkg/controller/endpointslice/reconciler.go @@ -32,7 +32,9 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/endpointslice/metrics" + "k8s.io/kubernetes/pkg/controller/endpointslice/topologycache" endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint" "k8s.io/kubernetes/pkg/features" ) @@ -45,6 +47,9 @@ type reconciler struct { maxEndpointsPerSlice int32 endpointSliceTracker *endpointSliceTracker metricsCache *metrics.Cache + // topologyCache tracks the distribution of Nodes and endpoints across zones + // to enable TopologyAwareHints. + topologyCache *topologycache.TopologyCache } // endpointMeta includes the attributes we group slices on, this type helps with @@ -73,6 +78,15 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis for _, existingSlice := range existingSlices { // service no longer supports that address type, add it to deleted slices if _, ok := serviceSupportedAddressesTypes[existingSlice.AddressType]; !ok { + if r.topologyCache != nil { + svcKey, err := serviceControllerKey(existingSlice) + if err != nil { + klog.Warningf("Couldn't get key to remove EndpointSlice from topology cache %+v: %v", existingSlice, err) + } else { + r.topologyCache.RemoveHints(svcKey, existingSlice.AddressType) + } + } + slicesToDelete = append(slicesToDelete, existingSlice) continue } @@ -222,6 +236,25 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor serviceNN := types.NamespacedName{Name: service.Name, Namespace: service.Namespace} r.metricsCache.UpdateServicePortCache(serviceNN, spMetrics) + // Topology hints are assigned per address type. This means it is + // theoretically possible for endpoints of one address type to be assigned + // hints while another endpoints of another address type are not. + si := &topologycache.SliceInfo{ + ServiceKey: fmt.Sprintf("%s/%s", service.Namespace, service.Name), + ToCreate: slicesToCreate, + ToUpdate: slicesToUpdate, + Unchanged: unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete), + } + + if r.topologyCache != nil && hintsEnabled(service.Annotations) { + slicesToCreate, slicesToUpdate = r.topologyCache.AddHints(si) + } else { + if r.topologyCache != nil { + r.topologyCache.RemoveHints(si.ServiceKey, addressType) + } + slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si) + } + return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime) } @@ -297,6 +330,14 @@ func (r *reconciler) finalize( metrics.EndpointSliceChanges.WithLabelValues("delete").Inc() } + topologyLabel := "disabled" + if r.topologyCache != nil && hintsEnabled(service.Annotations) { + topologyLabel = "auto" + } + + numSlicesChanged := len(slicesToCreate) + len(slicesToUpdate) + len(slicesToDelete) + metrics.EndpointSlicesChangedPerSync.WithLabelValues(topologyLabel).Observe(float64(numSlicesChanged)) + return nil } diff --git a/pkg/controller/endpointslice/reconciler_test.go b/pkg/controller/endpointslice/reconciler_test.go index 6a185e29ac5..b20aa2cfb0a 100644 --- a/pkg/controller/endpointslice/reconciler_test.go +++ b/pkg/controller/endpointslice/reconciler_test.go @@ -29,6 +29,7 @@ import ( corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/intstr" @@ -41,6 +42,7 @@ import ( "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/endpointslice/metrics" + "k8s.io/kubernetes/pkg/controller/endpointslice/topologycache" "k8s.io/kubernetes/pkg/features" utilpointer "k8s.io/utils/pointer" ) @@ -66,7 +68,7 @@ func TestReconcileEmpty(t *testing.T) { assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports) assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints) expectTrackedGeneration(t, r.endpointSliceTracker, &slices[0], 1) - expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0}) + expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 1}) } // Given a single pod matching a service selector and no existing endpoint slices, @@ -436,16 +438,22 @@ func TestReconcile1Pod(t *testing.T) { expectTrackedGeneration(t, r.endpointSliceTracker, &slice, 1) + expectSlicesChangedPerSync := 1 + if testCase.service.Spec.IPFamilies != nil && len(testCase.service.Spec.IPFamilies) > 0 { + expectSlicesChangedPerSync = len(testCase.service.Spec.IPFamilies) + } expectMetrics(t, expectedMetrics{ - desiredSlices: 1, - actualSlices: 1, - desiredEndpoints: 1, - addedPerSync: len(testCase.expectedEndpointPerSlice), - removedPerSync: 0, - numCreated: len(testCase.expectedEndpointPerSlice), - numUpdated: 0, - numDeleted: 0}) + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 1, + addedPerSync: len(testCase.expectedEndpointPerSlice), + removedPerSync: 0, + numCreated: len(testCase.expectedEndpointPerSlice), + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: expectSlicesChangedPerSync, + }) } }) } @@ -478,7 +486,7 @@ func TestReconcile1EndpointSlice(t *testing.T) { assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports) assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints) expectTrackedGeneration(t, r.endpointSliceTracker, &slices[0], 1) - expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 1, numDeleted: 0}) + expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 1, numDeleted: 0, slicesChangedPerSync: 1}) } // when a Service has PublishNotReadyAddresses set to true, corresponding @@ -539,7 +547,7 @@ func TestReconcileManyPods(t *testing.T) { // Two endpoint slices should be completely full, the remainder should be in another one expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 50}) - expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 250, removedPerSync: 0, numCreated: 3, numUpdated: 0, numDeleted: 0}) + expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 250, removedPerSync: 0, numCreated: 3, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 3}) } // now with preexisting slices, we have 250 pods matching a service @@ -590,7 +598,7 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) { // 1 new slice (0->100) + 1 updated slice (62->89) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{89, 61, 100}) - expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 127, removedPerSync: 0, numCreated: 1, numUpdated: 1, numDeleted: 0}) + expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 127, removedPerSync: 0, numCreated: 1, numUpdated: 1, numDeleted: 0, slicesChangedPerSync: 2}) // ensure cache mutation has not occurred cmc.Check(t) @@ -645,7 +653,7 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) { // 2 new slices (100, 52) in addition to existing slices (74, 74) expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{74, 74, 100, 52}) - expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 4, desiredEndpoints: 300, addedPerSync: 152, removedPerSync: 0, numCreated: 2, numUpdated: 0, numDeleted: 0}) + expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 4, desiredEndpoints: 300, addedPerSync: 152, removedPerSync: 0, numCreated: 2, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 2}) // ensure cache mutation has not occurred cmc.Check(t) @@ -804,7 +812,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) { // thanks to recycling, we get a free repack of endpoints, resulting in 3 full slices instead of 10 mostly empty slices expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 100}) - expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 0, numUpdated: 3, numDeleted: 7}) + expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 0, numUpdated: 3, numDeleted: 7, slicesChangedPerSync: 10}) // ensure cache mutation has not occurred cmc.Check(t) @@ -861,7 +869,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) { // ensure that both endpoint slices have been updated expectActions(t, client.Actions(), 2, "update", "endpointslices") - expectMetrics(t, expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 115, addedPerSync: 15, removedPerSync: 0, numCreated: 0, numUpdated: 2, numDeleted: 0}) + expectMetrics(t, expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 115, addedPerSync: 15, removedPerSync: 0, numCreated: 0, numUpdated: 2, numDeleted: 0, slicesChangedPerSync: 2}) // additional pods should get added to fuller slice expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{95, 20}) @@ -1036,7 +1044,7 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) { // reconcile should create 5 endpoint slices assert.Equal(t, 5, len(client.Actions()), "Expected 5 client actions as part of reconcile") expectActions(t, client.Actions(), 5, "create", "endpointslices") - expectMetrics(t, expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 5, numUpdated: 0, numDeleted: 0}) + expectMetrics(t, expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 5, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 5}) fetchedSlices := fetchEndpointSlices(t, client, namespace) @@ -1082,23 +1090,23 @@ func TestReconcileMaxEndpointsPerSlice(t *testing.T) { { maxEndpointsPerSlice: int32(50), expectedSliceLengths: []int{50, 50, 50, 50, 50}, - expectedMetricValues: expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 250, addedPerSync: 250, numCreated: 5}, + expectedMetricValues: expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 250, addedPerSync: 250, numCreated: 5, slicesChangedPerSync: 5}, }, { maxEndpointsPerSlice: int32(80), expectedSliceLengths: []int{80, 80, 80, 10}, - expectedMetricValues: expectedMetrics{desiredSlices: 4, actualSlices: 4, desiredEndpoints: 250, addedPerSync: 250, numCreated: 4}, + expectedMetricValues: expectedMetrics{desiredSlices: 4, actualSlices: 4, desiredEndpoints: 250, addedPerSync: 250, numCreated: 4, slicesChangedPerSync: 4}, }, { maxEndpointsPerSlice: int32(150), expectedSliceLengths: []int{150, 100}, - expectedMetricValues: expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 250, addedPerSync: 250, numCreated: 2}, + expectedMetricValues: expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 250, addedPerSync: 250, numCreated: 2, slicesChangedPerSync: 2}, }, { maxEndpointsPerSlice: int32(250), expectedSliceLengths: []int{250}, - expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1}, + expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1, slicesChangedPerSync: 1}, }, { maxEndpointsPerSlice: int32(500), expectedSliceLengths: []int{250}, - expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1}, + expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1, slicesChangedPerSync: 1}, }, } @@ -1133,11 +1141,11 @@ func TestReconcileEndpointSlicesMetrics(t *testing.T) { assert.Equal(t, 1, len(actions), "Expected 1 additional client actions as part of reconcile") assert.True(t, actions[0].Matches("create", "endpointslices"), "First action should be create endpoint slice") - expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 20, addedPerSync: 20, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0}) + expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 20, addedPerSync: 20, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 1}) fetchedSlices := fetchEndpointSlices(t, client, namespace) reconcileHelper(t, r, &svc, pods[0:10], []*discovery.EndpointSlice{&fetchedSlices[0]}, time.Now()) - expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 10, addedPerSync: 20, removedPerSync: 10, numCreated: 1, numUpdated: 1, numDeleted: 0}) + expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 10, addedPerSync: 20, removedPerSync: 10, numCreated: 1, numUpdated: 1, numDeleted: 0, slicesChangedPerSync: 2}) } // When a Service has a non-nil deletionTimestamp we want to avoid creating any @@ -1310,6 +1318,271 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) { } } +func TestReconcileTopology(t *testing.T) { + ns := "testing" + svc, endpointMeta := newServiceAndEndpointMeta("foo", ns) + + // 3 zones, 10 nodes and pods per zone + zones := []string{"zone-a", "zone-b", "zone-c"} + + pods := []*corev1.Pod{} + nodes := []*corev1.Node{} + nodesByName := map[string]*corev1.Node{} + for i, zone := range zones { + for j := 0; j < 10; j++ { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("node-%s-%d", zone, j), + Labels: map[string]string{ + corev1.LabelTopologyZone: zone, + }, + }, + Status: corev1.NodeStatus{ + Conditions: []corev1.NodeCondition{{ + Type: corev1.NodeReady, + Status: corev1.ConditionTrue, + }}, + Allocatable: corev1.ResourceList{"cpu": resource.MustParse("100m")}, + }, + } + nodesByName[node.Name] = node + nodes = append(nodes, node) + + pod := newPod(i*100+j, ns, true, 1, false) + pod.Spec.NodeName = node.Name + pods = append(pods, pod) + } + } + + slicesByName := map[string]*discovery.EndpointSlice{} + slicePods := map[string][]*corev1.Pod{ + "zone-a-b": {pods[7], pods[8], pods[16], pods[17], pods[18]}, + "zone-a-c": {pods[5], pods[6], pods[25], pods[26]}, + "zone-c": {pods[27], pods[28], pods[29]}, + } + + gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"} + ownerRef := metav1.NewControllerRef(&svc, gvk) + + for name, pods := range slicePods { + endpoints := []discovery.Endpoint{} + for _, pod := range pods { + endpoints = append(endpoints, podToEndpoint(pod, nodesByName[pod.Spec.NodeName], &svc, endpointMeta.AddressType)) + } + + slicesByName[name] = &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + OwnerReferences: []metav1.OwnerReference{*ownerRef}, + Labels: map[string]string{ + discovery.LabelManagedBy: controllerName, + discovery.LabelServiceName: svc.Name, + }, + }, + AddressType: endpointMeta.AddressType, + Ports: endpointMeta.Ports, + Endpoints: endpoints, + } + } + + testCases := []struct { + name string + topologyCacheEnabled bool + hintsAnnotation string + existingSlices []*discovery.EndpointSlice + pods []*corev1.Pod + nodes []*corev1.Node + expectedHints map[string]int + expectedCrossZoneHints int + expectedMetrics expectedMetrics + }{{ + name: "no change, topologyCache disabled, annotation == auto", + topologyCacheEnabled: false, + hintsAnnotation: "auto", + existingSlices: []*discovery.EndpointSlice{slicesByName["zone-c"]}, + pods: slicePods["zone-c"], + nodes: nodes, + expectedHints: nil, + expectedCrossZoneHints: 0, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 3, + addedPerSync: 0, + removedPerSync: 0, + numCreated: 0, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 0, + }, + }, { + name: "enabling topologyCache, hintsAnnotation == auto", + topologyCacheEnabled: true, + hintsAnnotation: "auto", + existingSlices: []*discovery.EndpointSlice{slicesByName["zone-c"]}, + pods: slicePods["zone-c"], + nodes: nodes, + expectedHints: map[string]int{ + "zone-a": 1, + "zone-b": 1, + "zone-c": 1, + }, + expectedCrossZoneHints: 2, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 3, + addedPerSync: 0, + removedPerSync: 0, + numCreated: 0, + numUpdated: 1, + numDeleted: 0, + slicesChangedPerSyncTopology: 1, + }, + }, { + name: "topology enabled, hintsAnnotation==auto, ratio beyond threshold", + topologyCacheEnabled: true, + hintsAnnotation: "auto", + existingSlices: []*discovery.EndpointSlice{slicesByName["zone-a-c"]}, + pods: slicePods["zone-a-c"], + nodes: nodes, + expectedHints: nil, + expectedCrossZoneHints: 0, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 4, + addedPerSync: 0, + removedPerSync: 0, + numCreated: 0, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSyncTopology: 0, + }, + }, { + name: "topology enabled, hintsAnnotation==auto, more slices and endpoints", + topologyCacheEnabled: true, + hintsAnnotation: "auto", + existingSlices: []*discovery.EndpointSlice{slicesByName["zone-a-c"], slicesByName["zone-a-b"]}, + pods: append(slicePods["zone-a-c"], slicePods["zone-a-b"]...), + nodes: nodes, + expectedHints: map[string]int{ + "zone-a": 3, + "zone-b": 3, + "zone-c": 3, + }, + expectedCrossZoneHints: 1, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 9, + addedPerSync: 0, + removedPerSync: 0, + numCreated: 0, + // TODO(robscott): Since we're potentially changing more slices when + // adding topology hints we could use it as a free repacking + // opportunity. That would make this value 1. + numUpdated: 2, + numDeleted: 0, + slicesChangedPerSyncTopology: 2, + }, + }, { + name: "topology enabled, hintsAnnotation==disabled, more slices and endpoints", + topologyCacheEnabled: true, + hintsAnnotation: "disabled", + existingSlices: []*discovery.EndpointSlice{slicesByName["zone-a-c"], slicesByName["zone-a-b"]}, + pods: append(slicePods["zone-a-c"], slicePods["zone-a-b"]...), + nodes: nodes, + expectedHints: nil, + expectedCrossZoneHints: 0, + expectedMetrics: expectedMetrics{ + desiredSlices: 1, + actualSlices: 1, + desiredEndpoints: 9, + addedPerSync: 0, + removedPerSync: 0, + numCreated: 0, + numUpdated: 0, + numDeleted: 0, + slicesChangedPerSync: 0, + }, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + client := newClientset() + cmc := newCacheMutationCheck(tc.existingSlices) + createEndpointSlices(t, client, ns, tc.existingSlices) + + setupMetrics() + r := newReconciler(client, tc.nodes, defaultMaxEndpointsPerSlice) + if tc.topologyCacheEnabled { + r.topologyCache = topologycache.NewTopologyCache() + r.topologyCache.SetNodes(tc.nodes) + } + + service := svc.DeepCopy() + service.Annotations = map[string]string{ + corev1.AnnotationTopologyAwareHints: tc.hintsAnnotation, + } + r.reconcile(service, tc.pods, tc.existingSlices, time.Now()) + + cmc.Check(t) + expectMetrics(t, tc.expectedMetrics) + fetchedSlices := fetchEndpointSlices(t, client, ns) + + if tc.expectedHints == nil { + for _, slice := range fetchedSlices { + for _, endpoint := range slice.Endpoints { + if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 { + t.Fatalf("Expected endpoint not to have zone hints: %+v", endpoint) + } + } + } + return + } + + actualCrossZoneHints := 0 + actualHints := map[string]int{} + + for _, slice := range fetchedSlices { + for _, endpoint := range slice.Endpoints { + if endpoint.Hints == nil || len(endpoint.Hints.ForZones) == 0 { + t.Fatalf("Expected endpoint to have zone hints: %+v", endpoint) + } + if len(endpoint.Hints.ForZones) > 1 { + t.Fatalf("Expected endpoint to only have 1 zone hint, got %d", len(endpoint.Hints.ForZones)) + } + + if endpoint.Zone == nil || *endpoint.Zone == "" { + t.Fatalf("Expected endpoint to have zone: %+v", endpoint) + } + zoneHint := endpoint.Hints.ForZones[0].Name + if *endpoint.Zone != zoneHint { + actualCrossZoneHints++ + } + actualHints[zoneHint]++ + } + } + + if len(actualHints) != len(tc.expectedHints) { + t.Errorf("Expected hints for %d zones, got %d", len(tc.expectedHints), len(actualHints)) + } + + for zone, expectedNum := range tc.expectedHints { + actualNum, _ := actualHints[zone] + if actualNum != expectedNum { + t.Errorf("Expected %d hints for %s zone, got %d", expectedNum, zone, actualNum) + } + } + + if actualCrossZoneHints != tc.expectedCrossZoneHints { + t.Errorf("Expected %d cross zone hints, got %d", tc.expectedCrossZoneHints, actualCrossZoneHints) + } + }) + } +} + // Test Helpers func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *reconciler { @@ -1446,14 +1719,18 @@ func reconcileHelper(t *testing.T, r *reconciler, service *corev1.Service, pods // Metrics helpers type expectedMetrics struct { - desiredSlices int - actualSlices int - desiredEndpoints int - addedPerSync int - removedPerSync int - numCreated int - numUpdated int - numDeleted int + desiredSlices int + actualSlices int + desiredEndpoints int + addedPerSync int + removedPerSync int + numCreated int + numUpdated int + numDeleted int + slicesChangedPerSync int + slicesChangedPerSyncTopology int + syncSuccesses int + syncErrors int } func expectMetrics(t *testing.T, em expectedMetrics) { @@ -1506,6 +1783,30 @@ func expectMetrics(t *testing.T, em expectedMetrics) { if actualDeleted != float64(em.numDeleted) { t.Errorf("Expected endpointSliceChangesDeleted to be %d, got %v", em.numDeleted, actualDeleted) } + + actualSlicesChangedPerSync, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("disabled")) + handleErr(t, err, "slicesChangedPerSync") + if actualSlicesChangedPerSync != float64(em.slicesChangedPerSync) { + t.Errorf("Expected slicesChangedPerSync to be %d, got %v", em.slicesChangedPerSync, actualSlicesChangedPerSync) + } + + actualSlicesChangedPerSyncTopology, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("auto")) + handleErr(t, err, "slicesChangedPerSyncTopology") + if actualSlicesChangedPerSyncTopology != float64(em.slicesChangedPerSyncTopology) { + t.Errorf("Expected slicesChangedPerSyncTopology to be %d, got %v", em.slicesChangedPerSyncTopology, actualSlicesChangedPerSyncTopology) + } + + actualSyncSuccesses, err := testutil.GetCounterMetricValue(metrics.EndpointSliceSyncs.WithLabelValues("success")) + handleErr(t, err, "syncSuccesses") + if actualSyncSuccesses != float64(em.syncSuccesses) { + t.Errorf("Expected endpointSliceSyncSuccesses to be %d, got %v", em.syncSuccesses, actualSyncSuccesses) + } + + actualSyncErrors, err := testutil.GetCounterMetricValue(metrics.EndpointSliceSyncs.WithLabelValues("error")) + handleErr(t, err, "syncErrors") + if actualSyncErrors != float64(em.syncErrors) { + t.Errorf("Expected endpointSliceSyncErrors to be %d, got %v", em.syncErrors, actualSyncErrors) + } } func handleErr(t *testing.T, err error, metricName string) { @@ -1524,4 +1825,8 @@ func setupMetrics() { metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "create"}) metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "update"}) metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "delete"}) + metrics.EndpointSlicesChangedPerSync.Delete(map[string]string{"topology": "disabled"}) + metrics.EndpointSlicesChangedPerSync.Delete(map[string]string{"topology": "auto"}) + metrics.EndpointSliceSyncs.Delete(map[string]string{"result": "success"}) + metrics.EndpointSliceSyncs.Delete(map[string]string{"result": "error"}) } diff --git a/pkg/controller/endpointslice/topologycache/sliceinfo.go b/pkg/controller/endpointslice/topologycache/sliceinfo.go new file mode 100644 index 00000000000..f525a1cdee7 --- /dev/null +++ b/pkg/controller/endpointslice/topologycache/sliceinfo.go @@ -0,0 +1,76 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topologycache + +import ( + discovery "k8s.io/api/discovery/v1" +) + +// SliceInfo stores information about EndpointSlices for the reconciliation +// process. +type SliceInfo struct { + ServiceKey string + AddressType discovery.AddressType + ToCreate []*discovery.EndpointSlice + ToUpdate []*discovery.EndpointSlice + Unchanged []*discovery.EndpointSlice +} + +func (si *SliceInfo) getTotalEndpoints() int { + totalEndpoints := 0 + for _, slice := range si.ToCreate { + totalEndpoints += len(slice.Endpoints) + } + for _, slice := range si.ToUpdate { + totalEndpoints += len(slice.Endpoints) + } + for _, slice := range si.Unchanged { + totalEndpoints += len(slice.Endpoints) + } + return totalEndpoints +} + +// getAllocatedHintsByZone sums up the allocated hints we currently have in +// unchanged slices and marks slices for update as necessary. A slice needs to +// be updated if any of the following are true: +// - It has an endpoint without zone hints +// - It has an endpoint hint for a zone that no longer needs any +// - It has endpoint hints that would make the minimum allocations necessary +// impossible with changes to slices that are already being updated or +// created. +func (si *SliceInfo) getAllocatedHintsByZone(allocations map[string]Allocation) EndpointZoneInfo { + allocatedHintsByZone := EndpointZoneInfo{} + + // Using filtering in place to remove any endpoints that are no longer + // unchanged (https://github.com/golang/go/wiki/SliceTricks#filter-in-place) + j := 0 + for _, slice := range si.Unchanged { + hintsByZone := getHintsByZone(slice, allocatedHintsByZone, allocations) + if hintsByZone == nil { + si.ToUpdate = append(si.ToUpdate, slice.DeepCopy()) + } else { + si.Unchanged[j] = slice + j++ + for zone, numHints := range hintsByZone { + allocatedHintsByZone[zone] += numHints + } + } + } + + si.Unchanged = si.Unchanged[:j] + return allocatedHintsByZone +} diff --git a/pkg/controller/endpointslice/topologycache/sliceinfo_test.go b/pkg/controller/endpointslice/topologycache/sliceinfo_test.go new file mode 100644 index 00000000000..36627f637eb --- /dev/null +++ b/pkg/controller/endpointslice/topologycache/sliceinfo_test.go @@ -0,0 +1,79 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topologycache + +import ( + "fmt" + "testing" + + discovery "k8s.io/api/discovery/v1" +) + +func TestGetTotalEndpoints(t *testing.T) { + testCases := []struct { + name string + si *SliceInfo + expectedTotal int + }{{ + name: "empty", + si: &SliceInfo{}, + expectedTotal: 0, + }, { + name: "empty slice", + si: &SliceInfo{ + ToCreate: []*discovery.EndpointSlice{sliceWithNEndpoints(0)}, + }, + expectedTotal: 0, + }, { + name: "multiple slices", + si: &SliceInfo{ + ToCreate: []*discovery.EndpointSlice{sliceWithNEndpoints(15), sliceWithNEndpoints(8)}, + }, + expectedTotal: 23, + }, { + name: "slices for all", + si: &SliceInfo{ + ToCreate: []*discovery.EndpointSlice{sliceWithNEndpoints(15), sliceWithNEndpoints(8)}, + ToUpdate: []*discovery.EndpointSlice{sliceWithNEndpoints(2)}, + Unchanged: []*discovery.EndpointSlice{sliceWithNEndpoints(100), sliceWithNEndpoints(90)}, + }, + expectedTotal: 215, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualTotal := tc.si.getTotalEndpoints() + if actualTotal != tc.expectedTotal { + t.Errorf("Expected %d, got %d", tc.expectedTotal, actualTotal) + } + }) + } +} + +// helpers + +func sliceWithNEndpoints(n int) *discovery.EndpointSlice { + endpoints := []discovery.Endpoint{} + + for i := 0; i < n; i++ { + endpoints = append(endpoints, discovery.Endpoint{Addresses: []string{fmt.Sprintf("10.1.2.%d", i)}}) + } + + return &discovery.EndpointSlice{ + Endpoints: endpoints, + } +} diff --git a/pkg/controller/endpointslice/topologycache/topologycache.go b/pkg/controller/endpointslice/topologycache/topologycache.go new file mode 100644 index 00000000000..7ea9afb2e84 --- /dev/null +++ b/pkg/controller/endpointslice/topologycache/topologycache.go @@ -0,0 +1,252 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topologycache + +import ( + "math" + "sync" + + "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" +) + +const ( + // OverloadThreshold represents the maximum overload any individual endpoint + // should be exposed to. + OverloadThreshold float64 = 0.2 +) + +// TopologyCache tracks the distribution of Nodes and endpoints across zones. +type TopologyCache struct { + lock sync.Mutex + sufficientNodeInfo bool + cpuByZone map[string]*resource.Quantity + cpuRatiosByZone map[string]float64 + endpointsByService map[string]map[discovery.AddressType]EndpointZoneInfo +} + +// EndpointZoneInfo tracks the distribution of endpoints across zones for a +// Service. +type EndpointZoneInfo map[string]int + +// Allocation describes the number of endpoints that should be allocated for a +// zone. +type Allocation struct { + Minimum int + Maximum int + Desired float64 +} + +// NewTopologyCache initializes a new TopologyCache. +func NewTopologyCache() *TopologyCache { + return &TopologyCache{ + cpuByZone: map[string]*resource.Quantity{}, + cpuRatiosByZone: map[string]float64{}, + endpointsByService: map[string]map[discovery.AddressType]EndpointZoneInfo{}, + } +} + +// GetOverloadedServices returns a list of Service keys that refer to Services +// that have crossed the overload threshold for any zone. +func (t *TopologyCache) GetOverloadedServices() []string { + t.lock.Lock() + defer t.lock.Unlock() + + svcKeys := []string{} + for svcKey, eziByAddrType := range t.endpointsByService { + for _, ezi := range eziByAddrType { + if serviceOverloaded(ezi, t.cpuRatiosByZone) { + svcKeys = append(svcKeys, svcKey) + break + } + } + } + + return svcKeys +} + +// AddHints adds or updates topology hints on EndpointSlices and returns updated +// lists of EndpointSlices to create and update. +func (t *TopologyCache) AddHints(si *SliceInfo) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice) { + totalEndpoints := si.getTotalEndpoints() + allocations := t.getAllocations(totalEndpoints) + + if allocations == nil { + klog.V(2).Infof("Insufficient endpoints, removing hints from %s Service", si.ServiceKey) + t.RemoveHints(si.ServiceKey, si.AddressType) + return RemoveHintsFromSlices(si) + } + + allocatedHintsByZone := si.getAllocatedHintsByZone(allocations) + + allocatableSlices := si.ToCreate + for _, slice := range si.ToUpdate { + allocatableSlices = append(allocatableSlices, slice) + } + + // step 1: assign same-zone hints for all endpoints as a starting point. + for _, slice := range allocatableSlices { + for i, endpoint := range slice.Endpoints { + if endpoint.Zone == nil || *endpoint.Zone == "" { + klog.Warningf("Endpoint found without zone specified, removing hints from %s Service", si.ServiceKey) + t.RemoveHints(si.ServiceKey, si.AddressType) + return RemoveHintsFromSlices(si) + } + + allocatedHintsByZone[*endpoint.Zone]++ + slice.Endpoints[i].Hints = &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: *endpoint.Zone}}} + } + } + + // step 2. Identify which zones need to donate slices and which need more. + givingZones, receivingZones := getGivingAndReceivingZones(allocations, allocatedHintsByZone) + + // step 3. Redistribute endpoints based on data from step 2. + redistributions := redistributeHints(allocatableSlices, givingZones, receivingZones) + + for zone, diff := range redistributions { + allocatedHintsByZone[zone] += diff + } + + t.SetHints(si.ServiceKey, si.AddressType, allocatedHintsByZone) + return si.ToCreate, si.ToUpdate +} + +// SetHints sets topology hints for the provided serviceKey and addrType in this +// cache. +func (t *TopologyCache) SetHints(serviceKey string, addrType discovery.AddressType, allocatedHintsByZone EndpointZoneInfo) { + if len(allocatedHintsByZone) == 0 { + t.RemoveHints(serviceKey, addrType) + return + } + + t.lock.Lock() + defer t.lock.Unlock() + + _, ok := t.endpointsByService[serviceKey] + if !ok { + t.endpointsByService[serviceKey] = map[discovery.AddressType]EndpointZoneInfo{} + } + t.endpointsByService[serviceKey][addrType] = allocatedHintsByZone +} + +// RemoveHints removes topology hints for the provided serviceKey and addrType +// from this cache. +func (t *TopologyCache) RemoveHints(serviceKey string, addrType discovery.AddressType) { + t.lock.Lock() + defer t.lock.Unlock() + + _, ok := t.endpointsByService[serviceKey] + if ok { + delete(t.endpointsByService[serviceKey], addrType) + } + if len(t.endpointsByService[serviceKey]) == 0 { + delete(t.endpointsByService, serviceKey) + } +} + +// SetNodes updates the Node distribution for the TopologyCache. +func (t *TopologyCache) SetNodes(nodes []*v1.Node) { + cpuByZone := map[string]*resource.Quantity{} + sufficientNodeInfo := true + + totalCPU := resource.Quantity{} + + for _, node := range nodes { + if !NodeReady(node.Status) { + continue + } + nodeCPU := node.Status.Allocatable.Cpu() + zone, ok := node.Labels[v1.LabelTopologyZone] + + // TODO(robscott): Figure out if there's an acceptable proportion of + // nodes with inadequate information. The current logic means that as + // soon as we find any node without a zone or allocatable CPU specified, + // we bail out entirely. Bailing out at this level will make our cluster + // wide ratios nil, which would result in slices for all Services having + // their hints removed. + if !ok || zone == "" || nodeCPU.IsZero() { + cpuByZone = map[string]*resource.Quantity{} + sufficientNodeInfo = false + break + } + + totalCPU.Add(*nodeCPU) + if _, ok = cpuByZone[zone]; !ok { + cpuByZone[zone] = nodeCPU + } else { + cpuByZone[zone].Add(*nodeCPU) + } + } + + t.lock.Lock() + defer t.lock.Unlock() + + if totalCPU.IsZero() || !sufficientNodeInfo || len(cpuByZone) < 2 { + t.sufficientNodeInfo = false + t.cpuByZone = nil + t.cpuRatiosByZone = nil + + } else { + t.sufficientNodeInfo = sufficientNodeInfo + t.cpuByZone = cpuByZone + + t.cpuRatiosByZone = map[string]float64{} + for zone, cpu := range cpuByZone { + t.cpuRatiosByZone[zone] = float64(cpu.MilliValue()) / float64(totalCPU.MilliValue()) + } + } +} + +// getAllocations returns a set of minimum and maximum allocations per zone. If +// it is not possible to provide allocations that are below the overload +// threshold, a nil value will be returned. +func (t *TopologyCache) getAllocations(numEndpoints int) map[string]Allocation { + if t.cpuRatiosByZone == nil || len(t.cpuRatiosByZone) < 2 || len(t.cpuRatiosByZone) > numEndpoints { + return nil + } + + t.lock.Lock() + defer t.lock.Unlock() + + remainingMinEndpoints := numEndpoints + minTotal := 0 + allocations := map[string]Allocation{} + + for zone, ratio := range t.cpuRatiosByZone { + desired := ratio * float64(numEndpoints) + minimum := int(math.Ceil(desired * (1 / (1 + OverloadThreshold)))) + allocations[zone] = Allocation{ + Minimum: minimum, + Desired: math.Max(desired, float64(minimum)), + } + minTotal += minimum + remainingMinEndpoints -= minimum + if remainingMinEndpoints < 0 { + return nil + } + } + + for zone, allocation := range allocations { + allocation.Maximum = allocation.Minimum + numEndpoints - minTotal + allocations[zone] = allocation + } + + return allocations +} diff --git a/pkg/controller/endpointslice/topologycache/topologycache_test.go b/pkg/controller/endpointslice/topologycache/topologycache_test.go new file mode 100644 index 00000000000..6f8bb369a9f --- /dev/null +++ b/pkg/controller/endpointslice/topologycache/topologycache_test.go @@ -0,0 +1,486 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topologycache + +import ( + "reflect" + "testing" + + "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilpointer "k8s.io/utils/pointer" +) + +func TestAddHints(t *testing.T) { + testCases := []struct { + name string + cpuRatiosByZone map[string]float64 + sliceInfo *SliceInfo + expectedEndpointsByAddrType map[discovery.AddressType]EndpointZoneInfo + expectedSlicesToCreate []*discovery.EndpointSlice + expectedSlicesToUpdate []*discovery.EndpointSlice + }{{ + name: "empty", + cpuRatiosByZone: nil, + sliceInfo: &SliceInfo{ + ServiceKey: "ns/svc", + AddressType: discovery.AddressTypeIPv4, + }, + expectedEndpointsByAddrType: nil, + expectedSlicesToCreate: []*discovery.EndpointSlice{}, + expectedSlicesToUpdate: []*discovery.EndpointSlice{}, + }, { + name: "slice to create, no zone ratios", + cpuRatiosByZone: nil, + sliceInfo: &SliceInfo{ + ServiceKey: "ns/svc", + AddressType: discovery.AddressTypeIPv4, + ToCreate: []*discovery.EndpointSlice{{ + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.1.2.3"}, + Zone: utilpointer.StringPtr("zone-a"), + }}, + }}, + }, + expectedEndpointsByAddrType: nil, + expectedSlicesToCreate: []*discovery.EndpointSlice{{ + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.1.2.3"}, + Zone: utilpointer.StringPtr("zone-a"), + }}, + }}, + expectedSlicesToUpdate: []*discovery.EndpointSlice{}, + }, { + name: "slice to create with 2 endpoints, zone ratios require 3", + cpuRatiosByZone: map[string]float64{ + "zone-a": 0.3, + "zone-b": 0.4, + "zone-c": 0.3, + }, + sliceInfo: &SliceInfo{ + ServiceKey: "ns/svc", + AddressType: discovery.AddressTypeIPv4, + ToCreate: []*discovery.EndpointSlice{{ + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.1.2.3"}, + Zone: utilpointer.StringPtr("zone-a"), + }, { + Addresses: []string{"10.1.2.4"}, + Zone: utilpointer.StringPtr("zone-b"), + }}, + }}, + }, + expectedEndpointsByAddrType: nil, + expectedSlicesToCreate: []*discovery.EndpointSlice{{ + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.1.2.3"}, + Zone: utilpointer.StringPtr("zone-a"), + }, { + Addresses: []string{"10.1.2.4"}, + Zone: utilpointer.StringPtr("zone-b"), + }}, + }}, + expectedSlicesToUpdate: []*discovery.EndpointSlice{}, + }, { + name: "slice to create with 2 endpoints, zone ratios only require 2", + cpuRatiosByZone: map[string]float64{ + "zone-a": 0.45, + "zone-b": 0.55, + }, + sliceInfo: &SliceInfo{ + ServiceKey: "ns/svc", + AddressType: discovery.AddressTypeIPv4, + ToCreate: []*discovery.EndpointSlice{{ + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.1.2.3"}, + Zone: utilpointer.StringPtr("zone-a"), + }, { + Addresses: []string{"10.1.2.4"}, + Zone: utilpointer.StringPtr("zone-b"), + }}, + }}, + }, + expectedEndpointsByAddrType: map[discovery.AddressType]EndpointZoneInfo{ + discovery.AddressTypeIPv4: { + "zone-a": 1, + "zone-b": 1, + }, + }, + expectedSlicesToCreate: []*discovery.EndpointSlice{{ + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.1.2.3"}, + Zone: utilpointer.StringPtr("zone-a"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}}, + }, { + Addresses: []string{"10.1.2.4"}, + Zone: utilpointer.StringPtr("zone-b"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-b"}}}, + }}, + }}, + expectedSlicesToUpdate: []*discovery.EndpointSlice{}, + }, { + name: "slices to create and update within 3 zone threshold", + cpuRatiosByZone: map[string]float64{ + "zone-a": 0.35, + "zone-b": 0.35, + "zone-c": 0.30, + }, + sliceInfo: &SliceInfo{ + ServiceKey: "ns/svc", + AddressType: discovery.AddressTypeIPv4, + ToCreate: []*discovery.EndpointSlice{{ + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.1.2.3"}, + Zone: utilpointer.StringPtr("zone-a"), + }, { + Addresses: []string{"10.1.2.4"}, + Zone: utilpointer.StringPtr("zone-b"), + }}, + }, { + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.1.3.3"}, + Zone: utilpointer.StringPtr("zone-c"), + }, { + Addresses: []string{"10.1.3.4"}, + Zone: utilpointer.StringPtr("zone-c"), + }, { + Addresses: []string{"10.1.3.4"}, + Zone: utilpointer.StringPtr("zone-a"), + }}, + }}, + ToUpdate: []*discovery.EndpointSlice{{ + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.2.2.3"}, + Zone: utilpointer.StringPtr("zone-a"), + }, { + Addresses: []string{"10.2.2.4"}, + Zone: utilpointer.StringPtr("zone-a"), + }}, + }, { + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.2.3.3"}, + Zone: utilpointer.StringPtr("zone-b"), + }, { + Addresses: []string{"10.2.3.4"}, + Zone: utilpointer.StringPtr("zone-c"), + }, { + Addresses: []string{"10.2.3.4"}, + Zone: utilpointer.StringPtr("zone-a"), + }}, + }}, + }, + expectedEndpointsByAddrType: map[discovery.AddressType]EndpointZoneInfo{ + discovery.AddressTypeIPv4: { + "zone-a": 4, + "zone-b": 3, + "zone-c": 3, + }, + }, + expectedSlicesToCreate: []*discovery.EndpointSlice{{ + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.1.2.3"}, + Zone: utilpointer.StringPtr("zone-a"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-b"}}}, + }, { + Addresses: []string{"10.1.2.4"}, + Zone: utilpointer.StringPtr("zone-b"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-b"}}}, + }}, + }, { + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.1.3.3"}, + Zone: utilpointer.StringPtr("zone-c"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-c"}}}, + }, { + Addresses: []string{"10.1.3.4"}, + Zone: utilpointer.StringPtr("zone-c"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-c"}}}, + }, { + Addresses: []string{"10.1.3.4"}, + Zone: utilpointer.StringPtr("zone-a"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}}, + }}, + }}, + expectedSlicesToUpdate: []*discovery.EndpointSlice{{ + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.2.2.3"}, + Zone: utilpointer.StringPtr("zone-a"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}}, + }, { + Addresses: []string{"10.2.2.4"}, + Zone: utilpointer.StringPtr("zone-a"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}}, + }}, + }, { + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"10.2.3.3"}, + Zone: utilpointer.StringPtr("zone-b"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-b"}}}, + }, { + Addresses: []string{"10.2.3.4"}, + Zone: utilpointer.StringPtr("zone-c"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-c"}}}, + }, { + Addresses: []string{"10.2.3.4"}, + Zone: utilpointer.StringPtr("zone-a"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}}, + }}, + }}, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cache := NewTopologyCache() + cache.cpuRatiosByZone = tc.cpuRatiosByZone + + slicesToCreate, slicesToUpdate := cache.AddHints(tc.sliceInfo) + + expectEquivalentSlices(t, slicesToCreate, tc.expectedSlicesToCreate) + expectEquivalentSlices(t, slicesToUpdate, tc.expectedSlicesToUpdate) + + endpointsByAddrType, ok := cache.endpointsByService[tc.sliceInfo.ServiceKey] + if tc.expectedEndpointsByAddrType == nil { + if ok { + t.Errorf("Expected no endpoints for Service %s, got %+v", tc.sliceInfo.ServiceKey, endpointsByAddrType) + } + } else { + if len(tc.expectedEndpointsByAddrType) != len(endpointsByAddrType) { + t.Fatalf("Expected endpoints for %d address types, got %d", len(tc.expectedEndpointsByAddrType), len(endpointsByAddrType)) + } + for addrType, expectedEndpointZoneInfo := range tc.expectedEndpointsByAddrType { + endpointZoneInfo, ok := endpointsByAddrType[addrType] + if !ok { + t.Fatalf("Expected endpoints for %s address type, got none", addrType) + } + + if len(expectedEndpointZoneInfo) != len(endpointZoneInfo) { + t.Fatalf("Expected endpoints for %d zones, got %d", len(expectedEndpointZoneInfo), len(endpointZoneInfo)) + } + + for zone, expectedNum := range expectedEndpointZoneInfo { + num, ok := endpointZoneInfo[zone] + if !ok { + t.Fatalf("Expected endpoints for %s zone, got none", zone) + } + if num != expectedNum { + t.Errorf("Expected %d endpoints for %s zone, got %d", expectedNum, zone, num) + } + } + } + } + }) + } +} + +func TestSetNodes(t *testing.T) { + type nodeInfo struct { + zone string + cpu resource.Quantity + ready v1.ConditionStatus + } + + testCases := []struct { + name string + nodes []nodeInfo + expectSufficientNodeInfo bool + expectedCPUByZone map[string]*resource.Quantity + expectedRatios map[string]float64 + }{{ + name: "empty", + nodes: []nodeInfo{}, + expectSufficientNodeInfo: false, + expectedCPUByZone: nil, + expectedRatios: nil, + }, { + name: "single node", + nodes: []nodeInfo{ + {zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue}, + }, + expectSufficientNodeInfo: false, + expectedCPUByZone: nil, + expectedRatios: nil, + }, { + name: "single zone", + nodes: []nodeInfo{ + {zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue}, + {zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue}, + }, + expectSufficientNodeInfo: false, + expectedCPUByZone: nil, + expectedRatios: nil, + }, { + name: "2 zones", + nodes: []nodeInfo{ + {zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue}, + {zone: "zone-b", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue}, + }, + expectSufficientNodeInfo: true, + expectedCPUByZone: map[string]*resource.Quantity{ + "zone-a": resource.NewQuantity(1, resource.BinarySI), + "zone-b": resource.NewQuantity(1, resource.BinarySI), + }, + expectedRatios: map[string]float64{ + "zone-a": 0.5, + "zone-b": 0.5, + }, + }, { + name: "2 zones, unready node in 1, ready node in 1", + nodes: []nodeInfo{ + {zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionFalse}, + {zone: "zone-b", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue}, + }, + expectSufficientNodeInfo: false, + expectedCPUByZone: nil, + expectedRatios: nil, + }, { + name: "2 zones, unready node in 1, ready node in 2", + nodes: []nodeInfo{ + {zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue}, + {zone: "zone-b", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue}, + {zone: "zone-b", cpu: resource.MustParse("1000m"), ready: v1.ConditionFalse}, + }, + expectSufficientNodeInfo: true, + expectedCPUByZone: map[string]*resource.Quantity{ + "zone-a": resource.NewQuantity(1, resource.BinarySI), + "zone-b": resource.NewQuantity(1, resource.BinarySI), + }, + expectedRatios: map[string]float64{ + "zone-a": 0.5, + "zone-b": 0.5, + }, + }, { + name: "3 zones, 4 nodes in 1, 2 nodes in 1, 1 node in 1", + nodes: []nodeInfo{ + {zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue}, + {zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue}, + {zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue}, + {zone: "zone-a", cpu: resource.MustParse("2000m"), ready: v1.ConditionTrue}, + {zone: "zone-b", cpu: resource.MustParse("3000m"), ready: v1.ConditionTrue}, + {zone: "zone-b", cpu: resource.MustParse("1500m"), ready: v1.ConditionTrue}, + {zone: "zone-c", cpu: resource.MustParse("500m"), ready: v1.ConditionTrue}, + }, + expectSufficientNodeInfo: true, + expectedCPUByZone: map[string]*resource.Quantity{ + "zone-a": resource.NewMilliQuantity(5000, resource.BinarySI), + "zone-b": resource.NewMilliQuantity(4500, resource.BinarySI), + "zone-c": resource.NewMilliQuantity(500, resource.BinarySI), + }, + expectedRatios: map[string]float64{ + "zone-a": 0.5, + "zone-b": 0.45, + "zone-c": 0.05, + }, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cache := NewTopologyCache() + nodes := make([]*v1.Node, 0, len(tc.nodes)) + for _, node := range tc.nodes { + labels := map[string]string{} + if node.zone != "" { + labels[v1.LabelTopologyZone] = node.zone + } + conditions := []v1.NodeCondition{{ + Type: v1.NodeReady, + Status: node.ready, + }} + allocatable := v1.ResourceList{ + v1.ResourceCPU: node.cpu, + } + nodes = append(nodes, &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Status: v1.NodeStatus{ + Allocatable: allocatable, + Conditions: conditions, + }, + }) + } + + cache.SetNodes(nodes) + + if cache.sufficientNodeInfo != tc.expectSufficientNodeInfo { + t.Errorf("Expected sufficientNodeInfo to be %t, got %t", tc.expectSufficientNodeInfo, cache.sufficientNodeInfo) + } + + if cache.cpuRatiosByZone == nil || tc.expectedRatios == nil { + if (cache.cpuRatiosByZone == nil) != (tc.expectedRatios == nil) { + t.Errorf("Expected %+v, got %+v", tc.expectedRatios, cache.cpuRatiosByZone) + } + } else { + if len(cache.cpuRatiosByZone) != len(tc.expectedRatios) { + t.Errorf("Expected ratios with %d zones, got %d", len(tc.expectedRatios), len(cache.cpuRatiosByZone)) + } + for zone, expectedRatio := range tc.expectedRatios { + actualRatio, ok := cache.cpuRatiosByZone[zone] + if !ok { + t.Errorf("Expected ratio for %s zone, got none", zone) + } else if actualRatio != expectedRatio { + t.Errorf("Expected ratio to be %f, got %f", expectedRatio, actualRatio) + } + } + } + + if cache.cpuByZone == nil || tc.expectedCPUByZone == nil { + if (cache.cpuByZone == nil) != (tc.expectedCPUByZone == nil) { + t.Errorf("Expected %+v, got %+v", tc.expectedCPUByZone, cache.cpuByZone) + } + } else { + if len(cache.cpuByZone) != len(tc.expectedCPUByZone) { + t.Errorf("Expected CPU with %d zones, got %d", len(tc.expectedCPUByZone), len(cache.cpuByZone)) + } + for zone, expectedCPU := range tc.expectedCPUByZone { + actualCPU, ok := cache.cpuByZone[zone] + if !ok { + t.Errorf("Expected CPU for %s zone, got none", zone) + } else if !actualCPU.Equal(*expectedCPU) { + t.Errorf("Expected CPU to be %d, got %d", expectedCPU.MilliValue(), actualCPU.MilliValue()) + } + } + } + }) + } +} + +// Test Helpers + +func expectEquivalentSlices(t *testing.T, actualSlices, expectedSlices []*discovery.EndpointSlice) { + t.Helper() + + if len(actualSlices) != len(expectedSlices) { + t.Fatalf("Expected %d slices, got %d", len(expectedSlices), len(actualSlices)) + } + + for i, expectedSlice := range expectedSlices { + actualSlice := actualSlices[i] + + if len(expectedSlice.Endpoints) != len(actualSlice.Endpoints) { + t.Errorf("Expected %d endpoints, got %d", len(expectedSlice.Endpoints), len(actualSlice.Endpoints)) + continue + } + for j, expectedEndpoint := range expectedSlice.Endpoints { + actualEndpoint := actualSlice.Endpoints[j] + if !reflect.DeepEqual(actualEndpoint, expectedEndpoint) { + t.Errorf("Endpoints didn't match\nExpected: %+v\nGot: %+v", expectedEndpoint, actualEndpoint) + } + } + } +} diff --git a/pkg/controller/endpointslice/topologycache/utils.go b/pkg/controller/endpointslice/topologycache/utils.go new file mode 100644 index 00000000000..d5a6f0b9bc8 --- /dev/null +++ b/pkg/controller/endpointslice/topologycache/utils.go @@ -0,0 +1,246 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topologycache + +import ( + "math" + + "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + "k8s.io/klog/v2" +) + +// RemoveHintsFromSlices removes topology hints on EndpointSlices and returns +// updated lists of EndpointSlices to create and update. +func RemoveHintsFromSlices(si *SliceInfo) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice) { + // Remove hints on all EndpointSlices we were already going to change. + slices := append(si.ToCreate, si.ToUpdate...) + for _, slice := range slices { + for i := range slice.Endpoints { + slice.Endpoints[i].Hints = nil + } + } + + // Remove hints on all unchanged EndpointSlices and mark them for update + // if any already had hints. We use j to track the number/index of slices + // that are still unchanged. + j := 0 + for _, slice := range si.Unchanged { + changed := false + for i, endpoint := range slice.Endpoints { + if endpoint.Hints != nil { + // Unchanged slices are still direct copies from informer cache. + // Need to deep copy before we make any modifications to avoid + // accidentally changing informer cache. + slice = slice.DeepCopy() + slice.Endpoints[i].Hints = nil + changed = true + } + } + if changed { + si.ToUpdate = append(si.ToUpdate, slice) + } else { + si.Unchanged[j] = slice + j++ + } + } + + // truncate si.Unchanged so it only includes slices that are still + // unchanged. + si.Unchanged = si.Unchanged[:j] + + return si.ToCreate, si.ToUpdate +} + +// redistributeHints redistributes hints based in the provided EndpointSlices. +// It allocates endpoints from the provided givingZones to the provided +// receivingZones. This returns a map that represents the changes in allocated +// endpoints by zone. +func redistributeHints(slices []*discovery.EndpointSlice, givingZones, receivingZones map[string]int) map[string]int { + redistributions := map[string]int{} + + for _, slice := range slices { + for i, endpoint := range slice.Endpoints { + if len(givingZones) == 0 || len(receivingZones) == 0 { + return redistributions + } + if endpoint.Zone == nil || *endpoint.Zone == "" { + // This should always be caught earlier in AddHints() + klog.Warningf("Endpoint found without zone specified") + continue + } + + givingZone := *endpoint.Zone + numToGive, ok := givingZones[givingZone] + if ok && numToGive > 0 { + for receivingZone, numToReceive := range receivingZones { + if numToReceive > 0 { + slice.Endpoints[i].Hints = &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: receivingZone}}} + if numToGive == 1 { + delete(givingZones, givingZone) + } else { + givingZones[givingZone]-- + } + if numToReceive == 1 { + delete(receivingZones, receivingZone) + } else { + receivingZones[receivingZone]-- + } + + redistributions[receivingZone]++ + redistributions[givingZone]-- + + break + } + } + } + } + } + return redistributions +} + +// getGivingAndReceivingZones returns the number of endpoints each zone should +// give to other zones along with the number of endpoints each zone should +// receive from other zones. This is calculated with the provided allocations +// (desired state) and allocatedHintsByZone (current state). +func getGivingAndReceivingZones(allocations map[string]Allocation, allocatedHintsByZone map[string]int) (map[string]int, map[string]int) { + // 1. Determine the precise number of additional endpoints each zone has + // (giving) or needs (receiving). + givingZonesDesired := map[string]float64{} + receivingZonesDesired := map[string]float64{} + + for zone, allocation := range allocations { + allocatedHints, _ := allocatedHintsByZone[zone] + target := allocation.Desired + if float64(allocatedHints) > target { + givingZonesDesired[zone] = float64(allocatedHints) - target + } else if float64(allocatedHints) < target { + receivingZonesDesired[zone] = target - float64(allocatedHints) + } + } + + // 2. Convert the precise numbers needed into ints representing real + // endpoints given from one zone to another. + givingZones := map[string]int{} + receivingZones := map[string]int{} + + for { + givingZone, numToGive := getMost(givingZonesDesired) + receivingZone, numToReceive := getMost(receivingZonesDesired) + + // return early if any of the following are true: + // - giving OR receiving zone are unspecified + // - giving AND receiving zones have less than 1 endpoint left to give or receive + // - giving OR receiving zones have less than 0.5 endpoints left to give or receive + if givingZone == "" || receivingZone == "" || (numToGive < 1.0 && numToReceive < 1.0) || numToGive < 0.5 || numToReceive < 0.5 { + break + } + + givingZones[givingZone]++ + givingZonesDesired[givingZone]-- + receivingZones[receivingZone]++ + receivingZonesDesired[receivingZone]-- + } + + return givingZones, receivingZones +} + +// getMost accepts a map[string]float64 and returns the string and float64 that +// represent the greatest value in this provided map. This function is not very +// efficient but it is expected that len() will rarely be greater than 2. +func getMost(zones map[string]float64) (string, float64) { + zone := "" + num := 0.0 + for z, n := range zones { + if n > num { + zone = z + num = n + } + } + + return zone, num +} + +// getHintsByZone returns the number of hints allocated to each zone by the +// provided EndpointSlice. This function returns nil to indicate that the +// current allocations are invalid and that the EndpointSlice needs to be +// updated. This could be caused by: +// - A hint for a zone that no longer requires any allocations. +// - An endpoint with no hints. +// - Hints that would make minimum allocations impossible. +func getHintsByZone(slice *discovery.EndpointSlice, allocatedHintsByZone EndpointZoneInfo, allocations map[string]Allocation) map[string]int { + hintsByZone := map[string]int{} + for _, endpoint := range slice.Endpoints { + if endpoint.Hints == nil || len(endpoint.Hints.ForZones) == 0 { + return nil + } + zone := endpoint.Hints.ForZones[0].Name + if _, ok := allocations[zone]; ok { + return nil + } + } + + for zone, numHints := range hintsByZone { + alreadyAllocated, _ := allocatedHintsByZone[zone] + allocation, ok := allocations[zone] + if !ok || (numHints+alreadyAllocated) > allocation.Maximum { + return nil + } + } + + return hintsByZone +} + +// serviceOverloaded returns true if the Service has an insufficient amount of +// endpoints for any zone. +func serviceOverloaded(ezi EndpointZoneInfo, zoneRatios map[string]float64) bool { + if len(ezi) == 0 { + return false + } + if len(zoneRatios) == 0 { + return true + } + + totalEndpoints := 0.0 + for _, numEndpoints := range ezi { + totalEndpoints += float64(numEndpoints) + } + + for zone, ratio := range zoneRatios { + svcEndpoints, ok := ezi[zone] + if !ok { + return true + } + minEndpoints := math.Ceil(totalEndpoints * ratio * (1 / (1 + OverloadThreshold))) + if svcEndpoints < int(minEndpoints) { + return true + } + } + + return false +} + +// NodeReady returns true if the Node has a status condition of type "NodeReady" +// with a status of "True". +func NodeReady(nodeStatus v1.NodeStatus) bool { + for _, cond := range nodeStatus.Conditions { + if cond.Type == v1.NodeReady { + return cond.Status == v1.ConditionTrue + } + } + return false +} diff --git a/pkg/controller/endpointslice/topologycache/utils_test.go b/pkg/controller/endpointslice/topologycache/utils_test.go new file mode 100644 index 00000000000..1ecb911ba8d --- /dev/null +++ b/pkg/controller/endpointslice/topologycache/utils_test.go @@ -0,0 +1,195 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package topologycache + +import ( + "reflect" + "testing" + + discovery "k8s.io/api/discovery/v1" + utilpointer "k8s.io/utils/pointer" +) + +func Test_redistributeHints(t *testing.T) { + testCases := []struct { + name string + slices []*discovery.EndpointSlice + givingZones map[string]int + receivingZones map[string]int + expectedRedistributions map[string]int + }{{ + name: "empty", + slices: []*discovery.EndpointSlice{}, + givingZones: map[string]int{}, + receivingZones: map[string]int{}, + expectedRedistributions: map[string]int{}, + }, { + name: "single endpoint", + slices: []*discovery.EndpointSlice{{ + Endpoints: []discovery.Endpoint{{ + Zone: utilpointer.StringPtr("zone-a"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}}, + }}, + }}, + givingZones: map[string]int{"zone-a": 1}, + receivingZones: map[string]int{"zone-b": 1}, + expectedRedistributions: map[string]int{"zone-a": -1, "zone-b": 1}, + }, { + name: "endpoints from 1 zone redistributed to 2 other zones", + slices: []*discovery.EndpointSlice{{ + Endpoints: []discovery.Endpoint{{ + Zone: utilpointer.StringPtr("zone-a"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}}, + }, { + Zone: utilpointer.StringPtr("zone-a"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}}, + }, { + Zone: utilpointer.StringPtr("zone-a"), + Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}}, + }}, + }}, + givingZones: map[string]int{"zone-a": 2}, + receivingZones: map[string]int{"zone-b": 1, "zone-c": 1}, + expectedRedistributions: map[string]int{"zone-a": -2, "zone-b": 1, "zone-c": 1}, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualRedistributions := redistributeHints(tc.slices, tc.givingZones, tc.receivingZones) + + if len(actualRedistributions) != len(tc.expectedRedistributions) { + t.Fatalf("Expected redistributions for %d zones, got %d (%+v)", len(tc.expectedRedistributions), len(actualRedistributions), actualRedistributions) + } + + for zone, expectedNum := range tc.expectedRedistributions { + actualNum, _ := actualRedistributions[zone] + if actualNum != expectedNum { + t.Errorf("Expected redistribution of %d for zone %s, got %d", expectedNum, zone, actualNum) + } + } + }) + } +} + +func Test_getGivingAndReceivingZones(t *testing.T) { + testCases := []struct { + name string + allocations map[string]Allocation + allocatedHintsByZone map[string]int + expectedGivingZones map[string]int + expectedReceivingZones map[string]int + }{{ + name: "empty", + allocations: map[string]Allocation{}, + allocatedHintsByZone: map[string]int{}, + expectedGivingZones: map[string]int{}, + expectedReceivingZones: map[string]int{}, + }, { + name: "simple allocation with no need for rebalancing", + allocations: map[string]Allocation{ + "zone-a": {Desired: 1.2}, + "zone-b": {Desired: 1.1}, + "zone-c": {Desired: 1.0}, + }, + allocatedHintsByZone: map[string]int{"zone-a": 1, "zone-b": 1, "zone-c": 1}, + expectedGivingZones: map[string]int{}, + expectedReceivingZones: map[string]int{}, + }, { + name: "preference for same zone even when giving an extra endpoint would result in slightly better distribution", + allocations: map[string]Allocation{ + "zone-a": {Desired: 5.1}, + "zone-b": {Desired: 5.1}, + "zone-c": {Desired: 5.8}, + }, + allocatedHintsByZone: map[string]int{"zone-a": 16}, + expectedGivingZones: map[string]int{"zone-a": 10}, + expectedReceivingZones: map[string]int{"zone-b": 5, "zone-c": 5}, + }, { + name: "when 2 zones need < 1 endpoint, give to zone that needs endpoint most", + allocations: map[string]Allocation{ + "zone-a": {Desired: 5.0}, + "zone-b": {Desired: 5.6}, + "zone-c": {Desired: 5.4}, + }, + allocatedHintsByZone: map[string]int{"zone-a": 16}, + expectedGivingZones: map[string]int{"zone-a": 11}, + expectedReceivingZones: map[string]int{"zone-b": 6, "zone-c": 5}, + }, { + name: "when 2 zones have extra endpoints, give from zone with most extra", + allocations: map[string]Allocation{ + "zone-a": {Desired: 5.0}, + "zone-b": {Desired: 5.6}, + "zone-c": {Desired: 5.4}, + }, + allocatedHintsByZone: map[string]int{"zone-b": 8, "zone-c": 8}, + expectedGivingZones: map[string]int{"zone-b": 2, "zone-c": 3}, + expectedReceivingZones: map[string]int{"zone-a": 5}, + }, { + name: "ensure function can handle unexpected data (more allocated than allocations)", + allocations: map[string]Allocation{ + "zone-a": {Desired: 5.0}, + "zone-b": {Desired: 5.0}, + "zone-c": {Desired: 5.0}, + }, + allocatedHintsByZone: map[string]int{"zone-a": 6, "zone-b": 6, "zone-c": 6}, + expectedGivingZones: map[string]int{}, + expectedReceivingZones: map[string]int{}, + }, { + name: "ensure function can handle unexpected data (negative allocations)", + allocations: map[string]Allocation{ + "zone-a": {Desired: -5.0}, + "zone-b": {Desired: -5.0}, + "zone-c": {Desired: -5.0}, + }, + allocatedHintsByZone: map[string]int{"zone-a": 6, "zone-b": 6, "zone-c": 6}, + expectedGivingZones: map[string]int{}, + expectedReceivingZones: map[string]int{}, + }, { + name: "ensure function can handle unexpected data (negative allocated)", + allocations: map[string]Allocation{ + "zone-a": {Desired: 5.0}, + "zone-b": {Desired: 5.0}, + "zone-c": {Desired: 5.0}, + }, + allocatedHintsByZone: map[string]int{"zone-a": -4, "zone-b": -3, "zone-c": -2}, + expectedGivingZones: map[string]int{}, + expectedReceivingZones: map[string]int{}, + }, { + name: "ensure function can handle unexpected data (negative for 1 zone)", + allocations: map[string]Allocation{ + "zone-a": {Desired: 5.0}, + "zone-b": {Desired: 5.0}, + "zone-c": {Desired: 5.0}, + }, + allocatedHintsByZone: map[string]int{"zone-a": -40, "zone-b": 20, "zone-c": 20}, + expectedGivingZones: map[string]int{"zone-b": 15, "zone-c": 15}, + expectedReceivingZones: map[string]int{"zone-a": 30}, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualGivingZones, actualReceivingZones := getGivingAndReceivingZones(tc.allocations, tc.allocatedHintsByZone) + + if !reflect.DeepEqual(actualGivingZones, tc.expectedGivingZones) { + t.Errorf("Expected %+v giving zones, got %+v", tc.expectedGivingZones, actualGivingZones) + } + if !reflect.DeepEqual(actualReceivingZones, tc.expectedReceivingZones) { + t.Errorf("Expected %+v receiving zones, got %+v", tc.expectedReceivingZones, actualReceivingZones) + } + }) + } +} diff --git a/pkg/controller/endpointslice/utils.go b/pkg/controller/endpointslice/utils.go index e98aba699f0..2a698405e64 100644 --- a/pkg/controller/endpointslice/utils.go +++ b/pkg/controller/endpointslice/utils.go @@ -27,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -365,3 +366,31 @@ func getAddressTypesForService(service *corev1.Service) map[discovery.AddressTyp klog.V(2).Infof("couldn't find ipfamilies for headless service: %v/%v likely because controller manager is likely connected to an old apiserver that does not support ip families yet. The service endpoint slice will use dual stack families until api-server default it correctly", service.Namespace, service.Name) return serviceSupportedAddresses } + +func unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete []*discovery.EndpointSlice) []*discovery.EndpointSlice { + changedSliceNames := sets.String{} + for _, slice := range slicesToUpdate { + changedSliceNames.Insert(slice.Name) + } + for _, slice := range slicesToDelete { + changedSliceNames.Insert(slice.Name) + } + unchangedSlices := []*discovery.EndpointSlice{} + for _, slice := range existingSlices { + if !changedSliceNames.Has(slice.Name) { + unchangedSlices = append(unchangedSlices, slice) + } + } + + return unchangedSlices +} + +// hintsEnabled returns true if the provided annotations include a +// corev1.AnnotationTopologyAwareHints key with a value set to "auto". +func hintsEnabled(annotations map[string]string) bool { + val, ok := annotations[corev1.AnnotationTopologyAwareHints] + if !ok { + return false + } + return val == "auto" +}