Adding support for TopologyAwareHints to EndpointSlice Controller

This commit is contained in:
Rob Scott 2021-03-05 12:05:40 -08:00
parent 11f0944dbc
commit 1dcf09c1bf
No known key found for this signature in database
GPG Key ID: 90C19B2D4A99C91B
12 changed files with 2019 additions and 38 deletions

View File

@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
discoveryinformers "k8s.io/client-go/informers/discovery/v1"
clientset "k8s.io/client-go/kubernetes"
@ -42,7 +43,9 @@ import (
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
endpointslicemetrics "k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
"k8s.io/kubernetes/pkg/controller/endpointslice/topologycache"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
"k8s.io/kubernetes/pkg/features"
)
const (
@ -142,13 +145,6 @@ func NewController(podInformer coreinformers.PodInformer,
c.maxEndpointsPerSlice = maxEndpointsPerSlice
c.reconciler = &reconciler{
client: c.client,
nodeLister: c.nodeLister,
maxEndpointsPerSlice: c.maxEndpointsPerSlice,
endpointSliceTracker: c.endpointSliceTracker,
metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice),
}
c.triggerTimeTracker = endpointutil.NewTriggerTimeTracker()
c.eventBroadcaster = broadcaster
@ -157,6 +153,25 @@ func NewController(podInformer coreinformers.PodInformer,
c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod
c.serviceSelectorCache = endpointutil.NewServiceSelectorCache()
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addNode,
UpdateFunc: c.updateNode,
DeleteFunc: c.deleteNode,
})
c.topologyCache = topologycache.NewTopologyCache()
}
c.reconciler = &reconciler{
client: c.client,
nodeLister: c.nodeLister,
maxEndpointsPerSlice: c.maxEndpointsPerSlice,
endpointSliceTracker: c.endpointSliceTracker,
metricsCache: endpointslicemetrics.NewCache(maxEndpointsPerSlice),
topologyCache: c.topologyCache,
}
return c
}
@ -227,6 +242,10 @@ type Controller struct {
// serviceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls
// to AsSelectorPreValidated (see #73527)
serviceSelectorCache *endpointutil.ServiceSelectorCache
// topologyCache tracks the distribution of Nodes and endpoints across zones
// to enable TopologyAwareHints.
topologyCache *topologycache.TopologyCache
}
// Run will not return until stopCh is closed.
@ -275,6 +294,8 @@ func (c *Controller) processNextWorkItem() bool {
}
func (c *Controller) handleErr(err error, key interface{}) {
trackSync(err)
if err == nil {
c.queue.Forget(key)
return
@ -490,3 +511,50 @@ func (c *Controller) deletePod(obj interface{}) {
c.addPod(pod)
}
}
func (c *Controller) addNode(obj interface{}) {
c.checkNodeTopologyDistribution()
}
func (c *Controller) updateNode(old, cur interface{}) {
oldNode := old.(*v1.Node)
curNode := cur.(*v1.Node)
if topologycache.NodeReady(oldNode.Status) != topologycache.NodeReady(curNode.Status) {
c.checkNodeTopologyDistribution()
}
}
func (c *Controller) deleteNode(obj interface{}) {
c.checkNodeTopologyDistribution()
}
// checkNodeTopologyDistribution updates Nodes in the topology cache and then
// queues any Services that are past the threshold.
func (c *Controller) checkNodeTopologyDistribution() {
if c.topologyCache == nil {
return
}
nodes, err := c.nodeLister.List(labels.Everything())
if err != nil {
klog.Errorf("Error listing Nodes: %v", err)
}
c.topologyCache.SetNodes(nodes)
serviceKeys := c.topologyCache.GetOverloadedServices()
for _, serviceKey := range serviceKeys {
c.queue.Add(serviceKey)
}
}
// trackSync increments the EndpointSliceSyncs metric with the result of a sync.
func trackSync(err error) {
metricLabel := "success"
if err != nil {
if isStaleInformerCacheErr(err) {
metricLabel = "stale"
} else {
metricLabel = "error"
}
}
endpointslicemetrics.EndpointSliceSyncs.WithLabelValues(metricLabel).Inc()
}

View File

@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -40,6 +41,7 @@ import (
"k8s.io/client-go/tools/cache"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/endpointslice/topologycache"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
"k8s.io/kubernetes/pkg/features"
utilpointer "k8s.io/utils/pointer"
@ -1501,6 +1503,183 @@ func TestSyncServiceStaleInformer(t *testing.T) {
}
}
func Test_checkNodeTopologyDistribution(t *testing.T) {
zoneA := "zone-a"
zoneB := "zone-b"
zoneC := "zone-c"
readyTrue := true
readyFalse := false
cpu100 := resource.MustParse("100m")
cpu1000 := resource.MustParse("1000m")
cpu2000 := resource.MustParse("2000m")
type nodeInfo struct {
zoneLabel *string
ready *bool
cpu *resource.Quantity
}
testCases := []struct {
name string
nodes []nodeInfo
topologyCacheEnabled bool
endpointZoneInfo map[string]topologycache.EndpointZoneInfo
expectedQueueLen int
}{{
name: "empty",
nodes: []nodeInfo{},
topologyCacheEnabled: false,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{},
expectedQueueLen: 0,
}, {
name: "lopsided, queue required",
nodes: []nodeInfo{
{zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu100},
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneA: 1, zoneB: 2, zoneC: 3},
},
expectedQueueLen: 1,
}, {
name: "lopsided but 1 unready, queue required because unready node means 0 CPU in one zone",
nodes: []nodeInfo{
{zoneLabel: &zoneA, ready: &readyFalse, cpu: &cpu100},
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneA: 1, zoneB: 2, zoneC: 3},
},
expectedQueueLen: 1,
}, {
name: "even zones, uneven endpoint distribution but within threshold, no sync required",
nodes: []nodeInfo{
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneB: 5, zoneC: 4},
},
expectedQueueLen: 0,
}, {
name: "even zones but node missing zone, sync required",
nodes: []nodeInfo{
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
{ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneB: 5, zoneC: 4},
},
expectedQueueLen: 1,
}, {
name: "even zones but node missing cpu, sync required",
nodes: []nodeInfo{
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneB, ready: &readyTrue},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneB: 5, zoneC: 4},
},
expectedQueueLen: 1,
}, {
name: "even zones, uneven endpoint distribution beyond threshold, no sync required",
nodes: []nodeInfo{
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu2000},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneB: 6, zoneC: 4},
},
expectedQueueLen: 1,
}, {
name: "3 uneven zones, matching endpoint distribution, no sync required",
nodes: []nodeInfo{
{zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu100},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneA: 20, zoneB: 10, zoneC: 1},
},
expectedQueueLen: 0,
}, {
name: "3 uneven zones, endpoint distribution within threshold but below 1, sync required",
nodes: []nodeInfo{
{zoneLabel: &zoneA, ready: &readyTrue, cpu: &cpu2000},
{zoneLabel: &zoneB, ready: &readyTrue, cpu: &cpu1000},
{zoneLabel: &zoneC, ready: &readyTrue, cpu: &cpu100},
},
topologyCacheEnabled: true,
endpointZoneInfo: map[string]topologycache.EndpointZoneInfo{
"ns/svc1": {zoneA: 20, zoneB: 10, zoneC: 0},
},
expectedQueueLen: 1,
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, esController := newController([]string{}, time.Duration(0))
for i, nodeInfo := range tc.nodes {
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("node-%d", i)},
Status: v1.NodeStatus{},
}
if nodeInfo.zoneLabel != nil {
node.Labels = map[string]string{v1.LabelTopologyZone: *nodeInfo.zoneLabel}
}
if nodeInfo.ready != nil {
status := v1.ConditionFalse
if *nodeInfo.ready {
status = v1.ConditionTrue
}
node.Status.Conditions = []v1.NodeCondition{{
Type: v1.NodeReady,
Status: status,
}}
}
if nodeInfo.cpu != nil {
node.Status.Allocatable = v1.ResourceList{
v1.ResourceCPU: *nodeInfo.cpu,
}
}
esController.nodeStore.Add(node)
if tc.topologyCacheEnabled {
esController.topologyCache = topologycache.NewTopologyCache()
for serviceKey, endpointZoneInfo := range tc.endpointZoneInfo {
esController.topologyCache.SetHints(serviceKey, discovery.AddressTypeIPv4, endpointZoneInfo)
}
}
}
esController.checkNodeTopologyDistribution()
if esController.queue.Len() != tc.expectedQueueLen {
t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.queue.Len())
}
})
}
}
// Test helpers
func addPods(t *testing.T, esController *endpointSliceController, namespace string, podsCount int) {
t.Helper()

View File

@ -93,6 +93,29 @@ var (
},
[]string{"operation"},
)
// EndpointSlicesChangedPerSync observes the number of EndpointSlices
// changed per sync.
EndpointSlicesChangedPerSync = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Subsystem: EndpointSliceSubsystem,
Name: "endpointslices_changed_per_sync",
Help: "Number of EndpointSlices changed on each Service sync",
},
[]string{"topology"}, // either "auto" or "disabled"
)
// EndpointSliceSyncs tracks the number of sync operations the controller
// runs along with their result.
EndpointSliceSyncs = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: EndpointSliceSubsystem,
Name: "syncs",
Help: "Number of EndpointSlice syncs",
StabilityLevel: metrics.ALPHA,
},
[]string{"result"}, // either "success", "stale", or "error"
)
)
var registerMetrics sync.Once
@ -106,5 +129,7 @@ func RegisterMetrics() {
legacyregistry.MustRegister(NumEndpointSlices)
legacyregistry.MustRegister(DesiredEndpointSlices)
legacyregistry.MustRegister(EndpointSliceChanges)
legacyregistry.MustRegister(EndpointSlicesChangedPerSync)
legacyregistry.MustRegister(EndpointSliceSyncs)
})
}

View File

@ -32,7 +32,9 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
"k8s.io/kubernetes/pkg/controller/endpointslice/topologycache"
endpointutil "k8s.io/kubernetes/pkg/controller/util/endpoint"
"k8s.io/kubernetes/pkg/features"
)
@ -45,6 +47,9 @@ type reconciler struct {
maxEndpointsPerSlice int32
endpointSliceTracker *endpointSliceTracker
metricsCache *metrics.Cache
// topologyCache tracks the distribution of Nodes and endpoints across zones
// to enable TopologyAwareHints.
topologyCache *topologycache.TopologyCache
}
// endpointMeta includes the attributes we group slices on, this type helps with
@ -73,6 +78,15 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
for _, existingSlice := range existingSlices {
// service no longer supports that address type, add it to deleted slices
if _, ok := serviceSupportedAddressesTypes[existingSlice.AddressType]; !ok {
if r.topologyCache != nil {
svcKey, err := serviceControllerKey(existingSlice)
if err != nil {
klog.Warningf("Couldn't get key to remove EndpointSlice from topology cache %+v: %v", existingSlice, err)
} else {
r.topologyCache.RemoveHints(svcKey, existingSlice.AddressType)
}
}
slicesToDelete = append(slicesToDelete, existingSlice)
continue
}
@ -222,6 +236,25 @@ func (r *reconciler) reconcileByAddressType(service *corev1.Service, pods []*cor
serviceNN := types.NamespacedName{Name: service.Name, Namespace: service.Namespace}
r.metricsCache.UpdateServicePortCache(serviceNN, spMetrics)
// Topology hints are assigned per address type. This means it is
// theoretically possible for endpoints of one address type to be assigned
// hints while another endpoints of another address type are not.
si := &topologycache.SliceInfo{
ServiceKey: fmt.Sprintf("%s/%s", service.Namespace, service.Name),
ToCreate: slicesToCreate,
ToUpdate: slicesToUpdate,
Unchanged: unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete),
}
if r.topologyCache != nil && hintsEnabled(service.Annotations) {
slicesToCreate, slicesToUpdate = r.topologyCache.AddHints(si)
} else {
if r.topologyCache != nil {
r.topologyCache.RemoveHints(si.ServiceKey, addressType)
}
slicesToCreate, slicesToUpdate = topologycache.RemoveHintsFromSlices(si)
}
return r.finalize(service, slicesToCreate, slicesToUpdate, slicesToDelete, triggerTime)
}
@ -297,6 +330,14 @@ func (r *reconciler) finalize(
metrics.EndpointSliceChanges.WithLabelValues("delete").Inc()
}
topologyLabel := "disabled"
if r.topologyCache != nil && hintsEnabled(service.Annotations) {
topologyLabel = "auto"
}
numSlicesChanged := len(slicesToCreate) + len(slicesToUpdate) + len(slicesToDelete)
metrics.EndpointSlicesChangedPerSync.WithLabelValues(topologyLabel).Observe(float64(numSlicesChanged))
return nil
}

View File

@ -29,6 +29,7 @@ import (
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/intstr"
@ -41,6 +42,7 @@ import (
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/endpointslice/metrics"
"k8s.io/kubernetes/pkg/controller/endpointslice/topologycache"
"k8s.io/kubernetes/pkg/features"
utilpointer "k8s.io/utils/pointer"
)
@ -66,7 +68,7 @@ func TestReconcileEmpty(t *testing.T) {
assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports)
assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints)
expectTrackedGeneration(t, r.endpointSliceTracker, &slices[0], 1)
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 1})
}
// Given a single pod matching a service selector and no existing endpoint slices,
@ -436,16 +438,22 @@ func TestReconcile1Pod(t *testing.T) {
expectTrackedGeneration(t, r.endpointSliceTracker, &slice, 1)
expectSlicesChangedPerSync := 1
if testCase.service.Spec.IPFamilies != nil && len(testCase.service.Spec.IPFamilies) > 0 {
expectSlicesChangedPerSync = len(testCase.service.Spec.IPFamilies)
}
expectMetrics(t,
expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 1,
addedPerSync: len(testCase.expectedEndpointPerSlice),
removedPerSync: 0,
numCreated: len(testCase.expectedEndpointPerSlice),
numUpdated: 0,
numDeleted: 0})
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 1,
addedPerSync: len(testCase.expectedEndpointPerSlice),
removedPerSync: 0,
numCreated: len(testCase.expectedEndpointPerSlice),
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: expectSlicesChangedPerSync,
})
}
})
}
@ -478,7 +486,7 @@ func TestReconcile1EndpointSlice(t *testing.T) {
assert.EqualValues(t, []discovery.EndpointPort{}, slices[0].Ports)
assert.EqualValues(t, []discovery.Endpoint{}, slices[0].Endpoints)
expectTrackedGeneration(t, r.endpointSliceTracker, &slices[0], 1)
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 1, numDeleted: 0})
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 0, addedPerSync: 0, removedPerSync: 0, numCreated: 0, numUpdated: 1, numDeleted: 0, slicesChangedPerSync: 1})
}
// when a Service has PublishNotReadyAddresses set to true, corresponding
@ -539,7 +547,7 @@ func TestReconcileManyPods(t *testing.T) {
// Two endpoint slices should be completely full, the remainder should be in another one
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 50})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 250, removedPerSync: 0, numCreated: 3, numUpdated: 0, numDeleted: 0})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 250, removedPerSync: 0, numCreated: 3, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 3})
}
// now with preexisting slices, we have 250 pods matching a service
@ -590,7 +598,7 @@ func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
// 1 new slice (0->100) + 1 updated slice (62->89)
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{89, 61, 100})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 127, removedPerSync: 0, numCreated: 1, numUpdated: 1, numDeleted: 0})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 250, addedPerSync: 127, removedPerSync: 0, numCreated: 1, numUpdated: 1, numDeleted: 0, slicesChangedPerSync: 2})
// ensure cache mutation has not occurred
cmc.Check(t)
@ -645,7 +653,7 @@ func TestReconcileEndpointSlicesSomePreexistingWorseAllocation(t *testing.T) {
// 2 new slices (100, 52) in addition to existing slices (74, 74)
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{74, 74, 100, 52})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 4, desiredEndpoints: 300, addedPerSync: 152, removedPerSync: 0, numCreated: 2, numUpdated: 0, numDeleted: 0})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 4, desiredEndpoints: 300, addedPerSync: 152, removedPerSync: 0, numCreated: 2, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 2})
// ensure cache mutation has not occurred
cmc.Check(t)
@ -804,7 +812,7 @@ func TestReconcileEndpointSlicesRecycling(t *testing.T) {
// thanks to recycling, we get a free repack of endpoints, resulting in 3 full slices instead of 10 mostly empty slices
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{100, 100, 100})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 0, numUpdated: 3, numDeleted: 7})
expectMetrics(t, expectedMetrics{desiredSlices: 3, actualSlices: 3, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 0, numUpdated: 3, numDeleted: 7, slicesChangedPerSync: 10})
// ensure cache mutation has not occurred
cmc.Check(t)
@ -861,7 +869,7 @@ func TestReconcileEndpointSlicesUpdatePacking(t *testing.T) {
// ensure that both endpoint slices have been updated
expectActions(t, client.Actions(), 2, "update", "endpointslices")
expectMetrics(t, expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 115, addedPerSync: 15, removedPerSync: 0, numCreated: 0, numUpdated: 2, numDeleted: 0})
expectMetrics(t, expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 115, addedPerSync: 15, removedPerSync: 0, numCreated: 0, numUpdated: 2, numDeleted: 0, slicesChangedPerSync: 2})
// additional pods should get added to fuller slice
expectUnorderedSlicesWithLengths(t, fetchEndpointSlices(t, client, namespace), []int{95, 20})
@ -1036,7 +1044,7 @@ func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
// reconcile should create 5 endpoint slices
assert.Equal(t, 5, len(client.Actions()), "Expected 5 client actions as part of reconcile")
expectActions(t, client.Actions(), 5, "create", "endpointslices")
expectMetrics(t, expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 5, numUpdated: 0, numDeleted: 0})
expectMetrics(t, expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 300, addedPerSync: 300, removedPerSync: 0, numCreated: 5, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 5})
fetchedSlices := fetchEndpointSlices(t, client, namespace)
@ -1082,23 +1090,23 @@ func TestReconcileMaxEndpointsPerSlice(t *testing.T) {
{
maxEndpointsPerSlice: int32(50),
expectedSliceLengths: []int{50, 50, 50, 50, 50},
expectedMetricValues: expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 250, addedPerSync: 250, numCreated: 5},
expectedMetricValues: expectedMetrics{desiredSlices: 5, actualSlices: 5, desiredEndpoints: 250, addedPerSync: 250, numCreated: 5, slicesChangedPerSync: 5},
}, {
maxEndpointsPerSlice: int32(80),
expectedSliceLengths: []int{80, 80, 80, 10},
expectedMetricValues: expectedMetrics{desiredSlices: 4, actualSlices: 4, desiredEndpoints: 250, addedPerSync: 250, numCreated: 4},
expectedMetricValues: expectedMetrics{desiredSlices: 4, actualSlices: 4, desiredEndpoints: 250, addedPerSync: 250, numCreated: 4, slicesChangedPerSync: 4},
}, {
maxEndpointsPerSlice: int32(150),
expectedSliceLengths: []int{150, 100},
expectedMetricValues: expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 250, addedPerSync: 250, numCreated: 2},
expectedMetricValues: expectedMetrics{desiredSlices: 2, actualSlices: 2, desiredEndpoints: 250, addedPerSync: 250, numCreated: 2, slicesChangedPerSync: 2},
}, {
maxEndpointsPerSlice: int32(250),
expectedSliceLengths: []int{250},
expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1},
expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1, slicesChangedPerSync: 1},
}, {
maxEndpointsPerSlice: int32(500),
expectedSliceLengths: []int{250},
expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1},
expectedMetricValues: expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 250, addedPerSync: 250, numCreated: 1, slicesChangedPerSync: 1},
},
}
@ -1133,11 +1141,11 @@ func TestReconcileEndpointSlicesMetrics(t *testing.T) {
assert.Equal(t, 1, len(actions), "Expected 1 additional client actions as part of reconcile")
assert.True(t, actions[0].Matches("create", "endpointslices"), "First action should be create endpoint slice")
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 20, addedPerSync: 20, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0})
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 20, addedPerSync: 20, removedPerSync: 0, numCreated: 1, numUpdated: 0, numDeleted: 0, slicesChangedPerSync: 1})
fetchedSlices := fetchEndpointSlices(t, client, namespace)
reconcileHelper(t, r, &svc, pods[0:10], []*discovery.EndpointSlice{&fetchedSlices[0]}, time.Now())
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 10, addedPerSync: 20, removedPerSync: 10, numCreated: 1, numUpdated: 1, numDeleted: 0})
expectMetrics(t, expectedMetrics{desiredSlices: 1, actualSlices: 1, desiredEndpoints: 10, addedPerSync: 20, removedPerSync: 10, numCreated: 1, numUpdated: 1, numDeleted: 0, slicesChangedPerSync: 2})
}
// When a Service has a non-nil deletionTimestamp we want to avoid creating any
@ -1310,6 +1318,271 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) {
}
}
func TestReconcileTopology(t *testing.T) {
ns := "testing"
svc, endpointMeta := newServiceAndEndpointMeta("foo", ns)
// 3 zones, 10 nodes and pods per zone
zones := []string{"zone-a", "zone-b", "zone-c"}
pods := []*corev1.Pod{}
nodes := []*corev1.Node{}
nodesByName := map[string]*corev1.Node{}
for i, zone := range zones {
for j := 0; j < 10; j++ {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("node-%s-%d", zone, j),
Labels: map[string]string{
corev1.LabelTopologyZone: zone,
},
},
Status: corev1.NodeStatus{
Conditions: []corev1.NodeCondition{{
Type: corev1.NodeReady,
Status: corev1.ConditionTrue,
}},
Allocatable: corev1.ResourceList{"cpu": resource.MustParse("100m")},
},
}
nodesByName[node.Name] = node
nodes = append(nodes, node)
pod := newPod(i*100+j, ns, true, 1, false)
pod.Spec.NodeName = node.Name
pods = append(pods, pod)
}
}
slicesByName := map[string]*discovery.EndpointSlice{}
slicePods := map[string][]*corev1.Pod{
"zone-a-b": {pods[7], pods[8], pods[16], pods[17], pods[18]},
"zone-a-c": {pods[5], pods[6], pods[25], pods[26]},
"zone-c": {pods[27], pods[28], pods[29]},
}
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
ownerRef := metav1.NewControllerRef(&svc, gvk)
for name, pods := range slicePods {
endpoints := []discovery.Endpoint{}
for _, pod := range pods {
endpoints = append(endpoints, podToEndpoint(pod, nodesByName[pod.Spec.NodeName], &svc, endpointMeta.AddressType))
}
slicesByName[name] = &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: name,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
Labels: map[string]string{
discovery.LabelManagedBy: controllerName,
discovery.LabelServiceName: svc.Name,
},
},
AddressType: endpointMeta.AddressType,
Ports: endpointMeta.Ports,
Endpoints: endpoints,
}
}
testCases := []struct {
name string
topologyCacheEnabled bool
hintsAnnotation string
existingSlices []*discovery.EndpointSlice
pods []*corev1.Pod
nodes []*corev1.Node
expectedHints map[string]int
expectedCrossZoneHints int
expectedMetrics expectedMetrics
}{{
name: "no change, topologyCache disabled, annotation == auto",
topologyCacheEnabled: false,
hintsAnnotation: "auto",
existingSlices: []*discovery.EndpointSlice{slicesByName["zone-c"]},
pods: slicePods["zone-c"],
nodes: nodes,
expectedHints: nil,
expectedCrossZoneHints: 0,
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 3,
addedPerSync: 0,
removedPerSync: 0,
numCreated: 0,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 0,
},
}, {
name: "enabling topologyCache, hintsAnnotation == auto",
topologyCacheEnabled: true,
hintsAnnotation: "auto",
existingSlices: []*discovery.EndpointSlice{slicesByName["zone-c"]},
pods: slicePods["zone-c"],
nodes: nodes,
expectedHints: map[string]int{
"zone-a": 1,
"zone-b": 1,
"zone-c": 1,
},
expectedCrossZoneHints: 2,
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 3,
addedPerSync: 0,
removedPerSync: 0,
numCreated: 0,
numUpdated: 1,
numDeleted: 0,
slicesChangedPerSyncTopology: 1,
},
}, {
name: "topology enabled, hintsAnnotation==auto, ratio beyond threshold",
topologyCacheEnabled: true,
hintsAnnotation: "auto",
existingSlices: []*discovery.EndpointSlice{slicesByName["zone-a-c"]},
pods: slicePods["zone-a-c"],
nodes: nodes,
expectedHints: nil,
expectedCrossZoneHints: 0,
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 4,
addedPerSync: 0,
removedPerSync: 0,
numCreated: 0,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSyncTopology: 0,
},
}, {
name: "topology enabled, hintsAnnotation==auto, more slices and endpoints",
topologyCacheEnabled: true,
hintsAnnotation: "auto",
existingSlices: []*discovery.EndpointSlice{slicesByName["zone-a-c"], slicesByName["zone-a-b"]},
pods: append(slicePods["zone-a-c"], slicePods["zone-a-b"]...),
nodes: nodes,
expectedHints: map[string]int{
"zone-a": 3,
"zone-b": 3,
"zone-c": 3,
},
expectedCrossZoneHints: 1,
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 9,
addedPerSync: 0,
removedPerSync: 0,
numCreated: 0,
// TODO(robscott): Since we're potentially changing more slices when
// adding topology hints we could use it as a free repacking
// opportunity. That would make this value 1.
numUpdated: 2,
numDeleted: 0,
slicesChangedPerSyncTopology: 2,
},
}, {
name: "topology enabled, hintsAnnotation==disabled, more slices and endpoints",
topologyCacheEnabled: true,
hintsAnnotation: "disabled",
existingSlices: []*discovery.EndpointSlice{slicesByName["zone-a-c"], slicesByName["zone-a-b"]},
pods: append(slicePods["zone-a-c"], slicePods["zone-a-b"]...),
nodes: nodes,
expectedHints: nil,
expectedCrossZoneHints: 0,
expectedMetrics: expectedMetrics{
desiredSlices: 1,
actualSlices: 1,
desiredEndpoints: 9,
addedPerSync: 0,
removedPerSync: 0,
numCreated: 0,
numUpdated: 0,
numDeleted: 0,
slicesChangedPerSync: 0,
},
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
client := newClientset()
cmc := newCacheMutationCheck(tc.existingSlices)
createEndpointSlices(t, client, ns, tc.existingSlices)
setupMetrics()
r := newReconciler(client, tc.nodes, defaultMaxEndpointsPerSlice)
if tc.topologyCacheEnabled {
r.topologyCache = topologycache.NewTopologyCache()
r.topologyCache.SetNodes(tc.nodes)
}
service := svc.DeepCopy()
service.Annotations = map[string]string{
corev1.AnnotationTopologyAwareHints: tc.hintsAnnotation,
}
r.reconcile(service, tc.pods, tc.existingSlices, time.Now())
cmc.Check(t)
expectMetrics(t, tc.expectedMetrics)
fetchedSlices := fetchEndpointSlices(t, client, ns)
if tc.expectedHints == nil {
for _, slice := range fetchedSlices {
for _, endpoint := range slice.Endpoints {
if endpoint.Hints != nil && len(endpoint.Hints.ForZones) > 0 {
t.Fatalf("Expected endpoint not to have zone hints: %+v", endpoint)
}
}
}
return
}
actualCrossZoneHints := 0
actualHints := map[string]int{}
for _, slice := range fetchedSlices {
for _, endpoint := range slice.Endpoints {
if endpoint.Hints == nil || len(endpoint.Hints.ForZones) == 0 {
t.Fatalf("Expected endpoint to have zone hints: %+v", endpoint)
}
if len(endpoint.Hints.ForZones) > 1 {
t.Fatalf("Expected endpoint to only have 1 zone hint, got %d", len(endpoint.Hints.ForZones))
}
if endpoint.Zone == nil || *endpoint.Zone == "" {
t.Fatalf("Expected endpoint to have zone: %+v", endpoint)
}
zoneHint := endpoint.Hints.ForZones[0].Name
if *endpoint.Zone != zoneHint {
actualCrossZoneHints++
}
actualHints[zoneHint]++
}
}
if len(actualHints) != len(tc.expectedHints) {
t.Errorf("Expected hints for %d zones, got %d", len(tc.expectedHints), len(actualHints))
}
for zone, expectedNum := range tc.expectedHints {
actualNum, _ := actualHints[zone]
if actualNum != expectedNum {
t.Errorf("Expected %d hints for %s zone, got %d", expectedNum, zone, actualNum)
}
}
if actualCrossZoneHints != tc.expectedCrossZoneHints {
t.Errorf("Expected %d cross zone hints, got %d", tc.expectedCrossZoneHints, actualCrossZoneHints)
}
})
}
}
// Test Helpers
func newReconciler(client *fake.Clientset, nodes []*corev1.Node, maxEndpointsPerSlice int32) *reconciler {
@ -1446,14 +1719,18 @@ func reconcileHelper(t *testing.T, r *reconciler, service *corev1.Service, pods
// Metrics helpers
type expectedMetrics struct {
desiredSlices int
actualSlices int
desiredEndpoints int
addedPerSync int
removedPerSync int
numCreated int
numUpdated int
numDeleted int
desiredSlices int
actualSlices int
desiredEndpoints int
addedPerSync int
removedPerSync int
numCreated int
numUpdated int
numDeleted int
slicesChangedPerSync int
slicesChangedPerSyncTopology int
syncSuccesses int
syncErrors int
}
func expectMetrics(t *testing.T, em expectedMetrics) {
@ -1506,6 +1783,30 @@ func expectMetrics(t *testing.T, em expectedMetrics) {
if actualDeleted != float64(em.numDeleted) {
t.Errorf("Expected endpointSliceChangesDeleted to be %d, got %v", em.numDeleted, actualDeleted)
}
actualSlicesChangedPerSync, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("disabled"))
handleErr(t, err, "slicesChangedPerSync")
if actualSlicesChangedPerSync != float64(em.slicesChangedPerSync) {
t.Errorf("Expected slicesChangedPerSync to be %d, got %v", em.slicesChangedPerSync, actualSlicesChangedPerSync)
}
actualSlicesChangedPerSyncTopology, err := testutil.GetHistogramMetricValue(metrics.EndpointSlicesChangedPerSync.WithLabelValues("auto"))
handleErr(t, err, "slicesChangedPerSyncTopology")
if actualSlicesChangedPerSyncTopology != float64(em.slicesChangedPerSyncTopology) {
t.Errorf("Expected slicesChangedPerSyncTopology to be %d, got %v", em.slicesChangedPerSyncTopology, actualSlicesChangedPerSyncTopology)
}
actualSyncSuccesses, err := testutil.GetCounterMetricValue(metrics.EndpointSliceSyncs.WithLabelValues("success"))
handleErr(t, err, "syncSuccesses")
if actualSyncSuccesses != float64(em.syncSuccesses) {
t.Errorf("Expected endpointSliceSyncSuccesses to be %d, got %v", em.syncSuccesses, actualSyncSuccesses)
}
actualSyncErrors, err := testutil.GetCounterMetricValue(metrics.EndpointSliceSyncs.WithLabelValues("error"))
handleErr(t, err, "syncErrors")
if actualSyncErrors != float64(em.syncErrors) {
t.Errorf("Expected endpointSliceSyncErrors to be %d, got %v", em.syncErrors, actualSyncErrors)
}
}
func handleErr(t *testing.T, err error, metricName string) {
@ -1524,4 +1825,8 @@ func setupMetrics() {
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "create"})
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "update"})
metrics.EndpointSliceChanges.Delete(map[string]string{"operation": "delete"})
metrics.EndpointSlicesChangedPerSync.Delete(map[string]string{"topology": "disabled"})
metrics.EndpointSlicesChangedPerSync.Delete(map[string]string{"topology": "auto"})
metrics.EndpointSliceSyncs.Delete(map[string]string{"result": "success"})
metrics.EndpointSliceSyncs.Delete(map[string]string{"result": "error"})
}

View File

@ -0,0 +1,76 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologycache
import (
discovery "k8s.io/api/discovery/v1"
)
// SliceInfo stores information about EndpointSlices for the reconciliation
// process.
type SliceInfo struct {
ServiceKey string
AddressType discovery.AddressType
ToCreate []*discovery.EndpointSlice
ToUpdate []*discovery.EndpointSlice
Unchanged []*discovery.EndpointSlice
}
func (si *SliceInfo) getTotalEndpoints() int {
totalEndpoints := 0
for _, slice := range si.ToCreate {
totalEndpoints += len(slice.Endpoints)
}
for _, slice := range si.ToUpdate {
totalEndpoints += len(slice.Endpoints)
}
for _, slice := range si.Unchanged {
totalEndpoints += len(slice.Endpoints)
}
return totalEndpoints
}
// getAllocatedHintsByZone sums up the allocated hints we currently have in
// unchanged slices and marks slices for update as necessary. A slice needs to
// be updated if any of the following are true:
// - It has an endpoint without zone hints
// - It has an endpoint hint for a zone that no longer needs any
// - It has endpoint hints that would make the minimum allocations necessary
// impossible with changes to slices that are already being updated or
// created.
func (si *SliceInfo) getAllocatedHintsByZone(allocations map[string]Allocation) EndpointZoneInfo {
allocatedHintsByZone := EndpointZoneInfo{}
// Using filtering in place to remove any endpoints that are no longer
// unchanged (https://github.com/golang/go/wiki/SliceTricks#filter-in-place)
j := 0
for _, slice := range si.Unchanged {
hintsByZone := getHintsByZone(slice, allocatedHintsByZone, allocations)
if hintsByZone == nil {
si.ToUpdate = append(si.ToUpdate, slice.DeepCopy())
} else {
si.Unchanged[j] = slice
j++
for zone, numHints := range hintsByZone {
allocatedHintsByZone[zone] += numHints
}
}
}
si.Unchanged = si.Unchanged[:j]
return allocatedHintsByZone
}

View File

@ -0,0 +1,79 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologycache
import (
"fmt"
"testing"
discovery "k8s.io/api/discovery/v1"
)
func TestGetTotalEndpoints(t *testing.T) {
testCases := []struct {
name string
si *SliceInfo
expectedTotal int
}{{
name: "empty",
si: &SliceInfo{},
expectedTotal: 0,
}, {
name: "empty slice",
si: &SliceInfo{
ToCreate: []*discovery.EndpointSlice{sliceWithNEndpoints(0)},
},
expectedTotal: 0,
}, {
name: "multiple slices",
si: &SliceInfo{
ToCreate: []*discovery.EndpointSlice{sliceWithNEndpoints(15), sliceWithNEndpoints(8)},
},
expectedTotal: 23,
}, {
name: "slices for all",
si: &SliceInfo{
ToCreate: []*discovery.EndpointSlice{sliceWithNEndpoints(15), sliceWithNEndpoints(8)},
ToUpdate: []*discovery.EndpointSlice{sliceWithNEndpoints(2)},
Unchanged: []*discovery.EndpointSlice{sliceWithNEndpoints(100), sliceWithNEndpoints(90)},
},
expectedTotal: 215,
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualTotal := tc.si.getTotalEndpoints()
if actualTotal != tc.expectedTotal {
t.Errorf("Expected %d, got %d", tc.expectedTotal, actualTotal)
}
})
}
}
// helpers
func sliceWithNEndpoints(n int) *discovery.EndpointSlice {
endpoints := []discovery.Endpoint{}
for i := 0; i < n; i++ {
endpoints = append(endpoints, discovery.Endpoint{Addresses: []string{fmt.Sprintf("10.1.2.%d", i)}})
}
return &discovery.EndpointSlice{
Endpoints: endpoints,
}
}

View File

@ -0,0 +1,252 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologycache
import (
"math"
"sync"
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
)
const (
// OverloadThreshold represents the maximum overload any individual endpoint
// should be exposed to.
OverloadThreshold float64 = 0.2
)
// TopologyCache tracks the distribution of Nodes and endpoints across zones.
type TopologyCache struct {
lock sync.Mutex
sufficientNodeInfo bool
cpuByZone map[string]*resource.Quantity
cpuRatiosByZone map[string]float64
endpointsByService map[string]map[discovery.AddressType]EndpointZoneInfo
}
// EndpointZoneInfo tracks the distribution of endpoints across zones for a
// Service.
type EndpointZoneInfo map[string]int
// Allocation describes the number of endpoints that should be allocated for a
// zone.
type Allocation struct {
Minimum int
Maximum int
Desired float64
}
// NewTopologyCache initializes a new TopologyCache.
func NewTopologyCache() *TopologyCache {
return &TopologyCache{
cpuByZone: map[string]*resource.Quantity{},
cpuRatiosByZone: map[string]float64{},
endpointsByService: map[string]map[discovery.AddressType]EndpointZoneInfo{},
}
}
// GetOverloadedServices returns a list of Service keys that refer to Services
// that have crossed the overload threshold for any zone.
func (t *TopologyCache) GetOverloadedServices() []string {
t.lock.Lock()
defer t.lock.Unlock()
svcKeys := []string{}
for svcKey, eziByAddrType := range t.endpointsByService {
for _, ezi := range eziByAddrType {
if serviceOverloaded(ezi, t.cpuRatiosByZone) {
svcKeys = append(svcKeys, svcKey)
break
}
}
}
return svcKeys
}
// AddHints adds or updates topology hints on EndpointSlices and returns updated
// lists of EndpointSlices to create and update.
func (t *TopologyCache) AddHints(si *SliceInfo) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice) {
totalEndpoints := si.getTotalEndpoints()
allocations := t.getAllocations(totalEndpoints)
if allocations == nil {
klog.V(2).Infof("Insufficient endpoints, removing hints from %s Service", si.ServiceKey)
t.RemoveHints(si.ServiceKey, si.AddressType)
return RemoveHintsFromSlices(si)
}
allocatedHintsByZone := si.getAllocatedHintsByZone(allocations)
allocatableSlices := si.ToCreate
for _, slice := range si.ToUpdate {
allocatableSlices = append(allocatableSlices, slice)
}
// step 1: assign same-zone hints for all endpoints as a starting point.
for _, slice := range allocatableSlices {
for i, endpoint := range slice.Endpoints {
if endpoint.Zone == nil || *endpoint.Zone == "" {
klog.Warningf("Endpoint found without zone specified, removing hints from %s Service", si.ServiceKey)
t.RemoveHints(si.ServiceKey, si.AddressType)
return RemoveHintsFromSlices(si)
}
allocatedHintsByZone[*endpoint.Zone]++
slice.Endpoints[i].Hints = &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: *endpoint.Zone}}}
}
}
// step 2. Identify which zones need to donate slices and which need more.
givingZones, receivingZones := getGivingAndReceivingZones(allocations, allocatedHintsByZone)
// step 3. Redistribute endpoints based on data from step 2.
redistributions := redistributeHints(allocatableSlices, givingZones, receivingZones)
for zone, diff := range redistributions {
allocatedHintsByZone[zone] += diff
}
t.SetHints(si.ServiceKey, si.AddressType, allocatedHintsByZone)
return si.ToCreate, si.ToUpdate
}
// SetHints sets topology hints for the provided serviceKey and addrType in this
// cache.
func (t *TopologyCache) SetHints(serviceKey string, addrType discovery.AddressType, allocatedHintsByZone EndpointZoneInfo) {
if len(allocatedHintsByZone) == 0 {
t.RemoveHints(serviceKey, addrType)
return
}
t.lock.Lock()
defer t.lock.Unlock()
_, ok := t.endpointsByService[serviceKey]
if !ok {
t.endpointsByService[serviceKey] = map[discovery.AddressType]EndpointZoneInfo{}
}
t.endpointsByService[serviceKey][addrType] = allocatedHintsByZone
}
// RemoveHints removes topology hints for the provided serviceKey and addrType
// from this cache.
func (t *TopologyCache) RemoveHints(serviceKey string, addrType discovery.AddressType) {
t.lock.Lock()
defer t.lock.Unlock()
_, ok := t.endpointsByService[serviceKey]
if ok {
delete(t.endpointsByService[serviceKey], addrType)
}
if len(t.endpointsByService[serviceKey]) == 0 {
delete(t.endpointsByService, serviceKey)
}
}
// SetNodes updates the Node distribution for the TopologyCache.
func (t *TopologyCache) SetNodes(nodes []*v1.Node) {
cpuByZone := map[string]*resource.Quantity{}
sufficientNodeInfo := true
totalCPU := resource.Quantity{}
for _, node := range nodes {
if !NodeReady(node.Status) {
continue
}
nodeCPU := node.Status.Allocatable.Cpu()
zone, ok := node.Labels[v1.LabelTopologyZone]
// TODO(robscott): Figure out if there's an acceptable proportion of
// nodes with inadequate information. The current logic means that as
// soon as we find any node without a zone or allocatable CPU specified,
// we bail out entirely. Bailing out at this level will make our cluster
// wide ratios nil, which would result in slices for all Services having
// their hints removed.
if !ok || zone == "" || nodeCPU.IsZero() {
cpuByZone = map[string]*resource.Quantity{}
sufficientNodeInfo = false
break
}
totalCPU.Add(*nodeCPU)
if _, ok = cpuByZone[zone]; !ok {
cpuByZone[zone] = nodeCPU
} else {
cpuByZone[zone].Add(*nodeCPU)
}
}
t.lock.Lock()
defer t.lock.Unlock()
if totalCPU.IsZero() || !sufficientNodeInfo || len(cpuByZone) < 2 {
t.sufficientNodeInfo = false
t.cpuByZone = nil
t.cpuRatiosByZone = nil
} else {
t.sufficientNodeInfo = sufficientNodeInfo
t.cpuByZone = cpuByZone
t.cpuRatiosByZone = map[string]float64{}
for zone, cpu := range cpuByZone {
t.cpuRatiosByZone[zone] = float64(cpu.MilliValue()) / float64(totalCPU.MilliValue())
}
}
}
// getAllocations returns a set of minimum and maximum allocations per zone. If
// it is not possible to provide allocations that are below the overload
// threshold, a nil value will be returned.
func (t *TopologyCache) getAllocations(numEndpoints int) map[string]Allocation {
if t.cpuRatiosByZone == nil || len(t.cpuRatiosByZone) < 2 || len(t.cpuRatiosByZone) > numEndpoints {
return nil
}
t.lock.Lock()
defer t.lock.Unlock()
remainingMinEndpoints := numEndpoints
minTotal := 0
allocations := map[string]Allocation{}
for zone, ratio := range t.cpuRatiosByZone {
desired := ratio * float64(numEndpoints)
minimum := int(math.Ceil(desired * (1 / (1 + OverloadThreshold))))
allocations[zone] = Allocation{
Minimum: minimum,
Desired: math.Max(desired, float64(minimum)),
}
minTotal += minimum
remainingMinEndpoints -= minimum
if remainingMinEndpoints < 0 {
return nil
}
}
for zone, allocation := range allocations {
allocation.Maximum = allocation.Minimum + numEndpoints - minTotal
allocations[zone] = allocation
}
return allocations
}

View File

@ -0,0 +1,486 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologycache
import (
"reflect"
"testing"
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilpointer "k8s.io/utils/pointer"
)
func TestAddHints(t *testing.T) {
testCases := []struct {
name string
cpuRatiosByZone map[string]float64
sliceInfo *SliceInfo
expectedEndpointsByAddrType map[discovery.AddressType]EndpointZoneInfo
expectedSlicesToCreate []*discovery.EndpointSlice
expectedSlicesToUpdate []*discovery.EndpointSlice
}{{
name: "empty",
cpuRatiosByZone: nil,
sliceInfo: &SliceInfo{
ServiceKey: "ns/svc",
AddressType: discovery.AddressTypeIPv4,
},
expectedEndpointsByAddrType: nil,
expectedSlicesToCreate: []*discovery.EndpointSlice{},
expectedSlicesToUpdate: []*discovery.EndpointSlice{},
}, {
name: "slice to create, no zone ratios",
cpuRatiosByZone: nil,
sliceInfo: &SliceInfo{
ServiceKey: "ns/svc",
AddressType: discovery.AddressTypeIPv4,
ToCreate: []*discovery.EndpointSlice{{
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.1.2.3"},
Zone: utilpointer.StringPtr("zone-a"),
}},
}},
},
expectedEndpointsByAddrType: nil,
expectedSlicesToCreate: []*discovery.EndpointSlice{{
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.1.2.3"},
Zone: utilpointer.StringPtr("zone-a"),
}},
}},
expectedSlicesToUpdate: []*discovery.EndpointSlice{},
}, {
name: "slice to create with 2 endpoints, zone ratios require 3",
cpuRatiosByZone: map[string]float64{
"zone-a": 0.3,
"zone-b": 0.4,
"zone-c": 0.3,
},
sliceInfo: &SliceInfo{
ServiceKey: "ns/svc",
AddressType: discovery.AddressTypeIPv4,
ToCreate: []*discovery.EndpointSlice{{
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.1.2.3"},
Zone: utilpointer.StringPtr("zone-a"),
}, {
Addresses: []string{"10.1.2.4"},
Zone: utilpointer.StringPtr("zone-b"),
}},
}},
},
expectedEndpointsByAddrType: nil,
expectedSlicesToCreate: []*discovery.EndpointSlice{{
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.1.2.3"},
Zone: utilpointer.StringPtr("zone-a"),
}, {
Addresses: []string{"10.1.2.4"},
Zone: utilpointer.StringPtr("zone-b"),
}},
}},
expectedSlicesToUpdate: []*discovery.EndpointSlice{},
}, {
name: "slice to create with 2 endpoints, zone ratios only require 2",
cpuRatiosByZone: map[string]float64{
"zone-a": 0.45,
"zone-b": 0.55,
},
sliceInfo: &SliceInfo{
ServiceKey: "ns/svc",
AddressType: discovery.AddressTypeIPv4,
ToCreate: []*discovery.EndpointSlice{{
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.1.2.3"},
Zone: utilpointer.StringPtr("zone-a"),
}, {
Addresses: []string{"10.1.2.4"},
Zone: utilpointer.StringPtr("zone-b"),
}},
}},
},
expectedEndpointsByAddrType: map[discovery.AddressType]EndpointZoneInfo{
discovery.AddressTypeIPv4: {
"zone-a": 1,
"zone-b": 1,
},
},
expectedSlicesToCreate: []*discovery.EndpointSlice{{
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.1.2.3"},
Zone: utilpointer.StringPtr("zone-a"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}},
}, {
Addresses: []string{"10.1.2.4"},
Zone: utilpointer.StringPtr("zone-b"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-b"}}},
}},
}},
expectedSlicesToUpdate: []*discovery.EndpointSlice{},
}, {
name: "slices to create and update within 3 zone threshold",
cpuRatiosByZone: map[string]float64{
"zone-a": 0.35,
"zone-b": 0.35,
"zone-c": 0.30,
},
sliceInfo: &SliceInfo{
ServiceKey: "ns/svc",
AddressType: discovery.AddressTypeIPv4,
ToCreate: []*discovery.EndpointSlice{{
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.1.2.3"},
Zone: utilpointer.StringPtr("zone-a"),
}, {
Addresses: []string{"10.1.2.4"},
Zone: utilpointer.StringPtr("zone-b"),
}},
}, {
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.1.3.3"},
Zone: utilpointer.StringPtr("zone-c"),
}, {
Addresses: []string{"10.1.3.4"},
Zone: utilpointer.StringPtr("zone-c"),
}, {
Addresses: []string{"10.1.3.4"},
Zone: utilpointer.StringPtr("zone-a"),
}},
}},
ToUpdate: []*discovery.EndpointSlice{{
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.2.2.3"},
Zone: utilpointer.StringPtr("zone-a"),
}, {
Addresses: []string{"10.2.2.4"},
Zone: utilpointer.StringPtr("zone-a"),
}},
}, {
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.2.3.3"},
Zone: utilpointer.StringPtr("zone-b"),
}, {
Addresses: []string{"10.2.3.4"},
Zone: utilpointer.StringPtr("zone-c"),
}, {
Addresses: []string{"10.2.3.4"},
Zone: utilpointer.StringPtr("zone-a"),
}},
}},
},
expectedEndpointsByAddrType: map[discovery.AddressType]EndpointZoneInfo{
discovery.AddressTypeIPv4: {
"zone-a": 4,
"zone-b": 3,
"zone-c": 3,
},
},
expectedSlicesToCreate: []*discovery.EndpointSlice{{
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.1.2.3"},
Zone: utilpointer.StringPtr("zone-a"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-b"}}},
}, {
Addresses: []string{"10.1.2.4"},
Zone: utilpointer.StringPtr("zone-b"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-b"}}},
}},
}, {
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.1.3.3"},
Zone: utilpointer.StringPtr("zone-c"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-c"}}},
}, {
Addresses: []string{"10.1.3.4"},
Zone: utilpointer.StringPtr("zone-c"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-c"}}},
}, {
Addresses: []string{"10.1.3.4"},
Zone: utilpointer.StringPtr("zone-a"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}},
}},
}},
expectedSlicesToUpdate: []*discovery.EndpointSlice{{
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.2.2.3"},
Zone: utilpointer.StringPtr("zone-a"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}},
}, {
Addresses: []string{"10.2.2.4"},
Zone: utilpointer.StringPtr("zone-a"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}},
}},
}, {
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.2.3.3"},
Zone: utilpointer.StringPtr("zone-b"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-b"}}},
}, {
Addresses: []string{"10.2.3.4"},
Zone: utilpointer.StringPtr("zone-c"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-c"}}},
}, {
Addresses: []string{"10.2.3.4"},
Zone: utilpointer.StringPtr("zone-a"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}},
}},
}},
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cache := NewTopologyCache()
cache.cpuRatiosByZone = tc.cpuRatiosByZone
slicesToCreate, slicesToUpdate := cache.AddHints(tc.sliceInfo)
expectEquivalentSlices(t, slicesToCreate, tc.expectedSlicesToCreate)
expectEquivalentSlices(t, slicesToUpdate, tc.expectedSlicesToUpdate)
endpointsByAddrType, ok := cache.endpointsByService[tc.sliceInfo.ServiceKey]
if tc.expectedEndpointsByAddrType == nil {
if ok {
t.Errorf("Expected no endpoints for Service %s, got %+v", tc.sliceInfo.ServiceKey, endpointsByAddrType)
}
} else {
if len(tc.expectedEndpointsByAddrType) != len(endpointsByAddrType) {
t.Fatalf("Expected endpoints for %d address types, got %d", len(tc.expectedEndpointsByAddrType), len(endpointsByAddrType))
}
for addrType, expectedEndpointZoneInfo := range tc.expectedEndpointsByAddrType {
endpointZoneInfo, ok := endpointsByAddrType[addrType]
if !ok {
t.Fatalf("Expected endpoints for %s address type, got none", addrType)
}
if len(expectedEndpointZoneInfo) != len(endpointZoneInfo) {
t.Fatalf("Expected endpoints for %d zones, got %d", len(expectedEndpointZoneInfo), len(endpointZoneInfo))
}
for zone, expectedNum := range expectedEndpointZoneInfo {
num, ok := endpointZoneInfo[zone]
if !ok {
t.Fatalf("Expected endpoints for %s zone, got none", zone)
}
if num != expectedNum {
t.Errorf("Expected %d endpoints for %s zone, got %d", expectedNum, zone, num)
}
}
}
}
})
}
}
func TestSetNodes(t *testing.T) {
type nodeInfo struct {
zone string
cpu resource.Quantity
ready v1.ConditionStatus
}
testCases := []struct {
name string
nodes []nodeInfo
expectSufficientNodeInfo bool
expectedCPUByZone map[string]*resource.Quantity
expectedRatios map[string]float64
}{{
name: "empty",
nodes: []nodeInfo{},
expectSufficientNodeInfo: false,
expectedCPUByZone: nil,
expectedRatios: nil,
}, {
name: "single node",
nodes: []nodeInfo{
{zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue},
},
expectSufficientNodeInfo: false,
expectedCPUByZone: nil,
expectedRatios: nil,
}, {
name: "single zone",
nodes: []nodeInfo{
{zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue},
{zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue},
},
expectSufficientNodeInfo: false,
expectedCPUByZone: nil,
expectedRatios: nil,
}, {
name: "2 zones",
nodes: []nodeInfo{
{zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue},
{zone: "zone-b", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue},
},
expectSufficientNodeInfo: true,
expectedCPUByZone: map[string]*resource.Quantity{
"zone-a": resource.NewQuantity(1, resource.BinarySI),
"zone-b": resource.NewQuantity(1, resource.BinarySI),
},
expectedRatios: map[string]float64{
"zone-a": 0.5,
"zone-b": 0.5,
},
}, {
name: "2 zones, unready node in 1, ready node in 1",
nodes: []nodeInfo{
{zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionFalse},
{zone: "zone-b", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue},
},
expectSufficientNodeInfo: false,
expectedCPUByZone: nil,
expectedRatios: nil,
}, {
name: "2 zones, unready node in 1, ready node in 2",
nodes: []nodeInfo{
{zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue},
{zone: "zone-b", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue},
{zone: "zone-b", cpu: resource.MustParse("1000m"), ready: v1.ConditionFalse},
},
expectSufficientNodeInfo: true,
expectedCPUByZone: map[string]*resource.Quantity{
"zone-a": resource.NewQuantity(1, resource.BinarySI),
"zone-b": resource.NewQuantity(1, resource.BinarySI),
},
expectedRatios: map[string]float64{
"zone-a": 0.5,
"zone-b": 0.5,
},
}, {
name: "3 zones, 4 nodes in 1, 2 nodes in 1, 1 node in 1",
nodes: []nodeInfo{
{zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue},
{zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue},
{zone: "zone-a", cpu: resource.MustParse("1000m"), ready: v1.ConditionTrue},
{zone: "zone-a", cpu: resource.MustParse("2000m"), ready: v1.ConditionTrue},
{zone: "zone-b", cpu: resource.MustParse("3000m"), ready: v1.ConditionTrue},
{zone: "zone-b", cpu: resource.MustParse("1500m"), ready: v1.ConditionTrue},
{zone: "zone-c", cpu: resource.MustParse("500m"), ready: v1.ConditionTrue},
},
expectSufficientNodeInfo: true,
expectedCPUByZone: map[string]*resource.Quantity{
"zone-a": resource.NewMilliQuantity(5000, resource.BinarySI),
"zone-b": resource.NewMilliQuantity(4500, resource.BinarySI),
"zone-c": resource.NewMilliQuantity(500, resource.BinarySI),
},
expectedRatios: map[string]float64{
"zone-a": 0.5,
"zone-b": 0.45,
"zone-c": 0.05,
},
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cache := NewTopologyCache()
nodes := make([]*v1.Node, 0, len(tc.nodes))
for _, node := range tc.nodes {
labels := map[string]string{}
if node.zone != "" {
labels[v1.LabelTopologyZone] = node.zone
}
conditions := []v1.NodeCondition{{
Type: v1.NodeReady,
Status: node.ready,
}}
allocatable := v1.ResourceList{
v1.ResourceCPU: node.cpu,
}
nodes = append(nodes, &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Status: v1.NodeStatus{
Allocatable: allocatable,
Conditions: conditions,
},
})
}
cache.SetNodes(nodes)
if cache.sufficientNodeInfo != tc.expectSufficientNodeInfo {
t.Errorf("Expected sufficientNodeInfo to be %t, got %t", tc.expectSufficientNodeInfo, cache.sufficientNodeInfo)
}
if cache.cpuRatiosByZone == nil || tc.expectedRatios == nil {
if (cache.cpuRatiosByZone == nil) != (tc.expectedRatios == nil) {
t.Errorf("Expected %+v, got %+v", tc.expectedRatios, cache.cpuRatiosByZone)
}
} else {
if len(cache.cpuRatiosByZone) != len(tc.expectedRatios) {
t.Errorf("Expected ratios with %d zones, got %d", len(tc.expectedRatios), len(cache.cpuRatiosByZone))
}
for zone, expectedRatio := range tc.expectedRatios {
actualRatio, ok := cache.cpuRatiosByZone[zone]
if !ok {
t.Errorf("Expected ratio for %s zone, got none", zone)
} else if actualRatio != expectedRatio {
t.Errorf("Expected ratio to be %f, got %f", expectedRatio, actualRatio)
}
}
}
if cache.cpuByZone == nil || tc.expectedCPUByZone == nil {
if (cache.cpuByZone == nil) != (tc.expectedCPUByZone == nil) {
t.Errorf("Expected %+v, got %+v", tc.expectedCPUByZone, cache.cpuByZone)
}
} else {
if len(cache.cpuByZone) != len(tc.expectedCPUByZone) {
t.Errorf("Expected CPU with %d zones, got %d", len(tc.expectedCPUByZone), len(cache.cpuByZone))
}
for zone, expectedCPU := range tc.expectedCPUByZone {
actualCPU, ok := cache.cpuByZone[zone]
if !ok {
t.Errorf("Expected CPU for %s zone, got none", zone)
} else if !actualCPU.Equal(*expectedCPU) {
t.Errorf("Expected CPU to be %d, got %d", expectedCPU.MilliValue(), actualCPU.MilliValue())
}
}
}
})
}
}
// Test Helpers
func expectEquivalentSlices(t *testing.T, actualSlices, expectedSlices []*discovery.EndpointSlice) {
t.Helper()
if len(actualSlices) != len(expectedSlices) {
t.Fatalf("Expected %d slices, got %d", len(expectedSlices), len(actualSlices))
}
for i, expectedSlice := range expectedSlices {
actualSlice := actualSlices[i]
if len(expectedSlice.Endpoints) != len(actualSlice.Endpoints) {
t.Errorf("Expected %d endpoints, got %d", len(expectedSlice.Endpoints), len(actualSlice.Endpoints))
continue
}
for j, expectedEndpoint := range expectedSlice.Endpoints {
actualEndpoint := actualSlice.Endpoints[j]
if !reflect.DeepEqual(actualEndpoint, expectedEndpoint) {
t.Errorf("Endpoints didn't match\nExpected: %+v\nGot: %+v", expectedEndpoint, actualEndpoint)
}
}
}
}

View File

@ -0,0 +1,246 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologycache
import (
"math"
"k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
"k8s.io/klog/v2"
)
// RemoveHintsFromSlices removes topology hints on EndpointSlices and returns
// updated lists of EndpointSlices to create and update.
func RemoveHintsFromSlices(si *SliceInfo) ([]*discovery.EndpointSlice, []*discovery.EndpointSlice) {
// Remove hints on all EndpointSlices we were already going to change.
slices := append(si.ToCreate, si.ToUpdate...)
for _, slice := range slices {
for i := range slice.Endpoints {
slice.Endpoints[i].Hints = nil
}
}
// Remove hints on all unchanged EndpointSlices and mark them for update
// if any already had hints. We use j to track the number/index of slices
// that are still unchanged.
j := 0
for _, slice := range si.Unchanged {
changed := false
for i, endpoint := range slice.Endpoints {
if endpoint.Hints != nil {
// Unchanged slices are still direct copies from informer cache.
// Need to deep copy before we make any modifications to avoid
// accidentally changing informer cache.
slice = slice.DeepCopy()
slice.Endpoints[i].Hints = nil
changed = true
}
}
if changed {
si.ToUpdate = append(si.ToUpdate, slice)
} else {
si.Unchanged[j] = slice
j++
}
}
// truncate si.Unchanged so it only includes slices that are still
// unchanged.
si.Unchanged = si.Unchanged[:j]
return si.ToCreate, si.ToUpdate
}
// redistributeHints redistributes hints based in the provided EndpointSlices.
// It allocates endpoints from the provided givingZones to the provided
// receivingZones. This returns a map that represents the changes in allocated
// endpoints by zone.
func redistributeHints(slices []*discovery.EndpointSlice, givingZones, receivingZones map[string]int) map[string]int {
redistributions := map[string]int{}
for _, slice := range slices {
for i, endpoint := range slice.Endpoints {
if len(givingZones) == 0 || len(receivingZones) == 0 {
return redistributions
}
if endpoint.Zone == nil || *endpoint.Zone == "" {
// This should always be caught earlier in AddHints()
klog.Warningf("Endpoint found without zone specified")
continue
}
givingZone := *endpoint.Zone
numToGive, ok := givingZones[givingZone]
if ok && numToGive > 0 {
for receivingZone, numToReceive := range receivingZones {
if numToReceive > 0 {
slice.Endpoints[i].Hints = &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: receivingZone}}}
if numToGive == 1 {
delete(givingZones, givingZone)
} else {
givingZones[givingZone]--
}
if numToReceive == 1 {
delete(receivingZones, receivingZone)
} else {
receivingZones[receivingZone]--
}
redistributions[receivingZone]++
redistributions[givingZone]--
break
}
}
}
}
}
return redistributions
}
// getGivingAndReceivingZones returns the number of endpoints each zone should
// give to other zones along with the number of endpoints each zone should
// receive from other zones. This is calculated with the provided allocations
// (desired state) and allocatedHintsByZone (current state).
func getGivingAndReceivingZones(allocations map[string]Allocation, allocatedHintsByZone map[string]int) (map[string]int, map[string]int) {
// 1. Determine the precise number of additional endpoints each zone has
// (giving) or needs (receiving).
givingZonesDesired := map[string]float64{}
receivingZonesDesired := map[string]float64{}
for zone, allocation := range allocations {
allocatedHints, _ := allocatedHintsByZone[zone]
target := allocation.Desired
if float64(allocatedHints) > target {
givingZonesDesired[zone] = float64(allocatedHints) - target
} else if float64(allocatedHints) < target {
receivingZonesDesired[zone] = target - float64(allocatedHints)
}
}
// 2. Convert the precise numbers needed into ints representing real
// endpoints given from one zone to another.
givingZones := map[string]int{}
receivingZones := map[string]int{}
for {
givingZone, numToGive := getMost(givingZonesDesired)
receivingZone, numToReceive := getMost(receivingZonesDesired)
// return early if any of the following are true:
// - giving OR receiving zone are unspecified
// - giving AND receiving zones have less than 1 endpoint left to give or receive
// - giving OR receiving zones have less than 0.5 endpoints left to give or receive
if givingZone == "" || receivingZone == "" || (numToGive < 1.0 && numToReceive < 1.0) || numToGive < 0.5 || numToReceive < 0.5 {
break
}
givingZones[givingZone]++
givingZonesDesired[givingZone]--
receivingZones[receivingZone]++
receivingZonesDesired[receivingZone]--
}
return givingZones, receivingZones
}
// getMost accepts a map[string]float64 and returns the string and float64 that
// represent the greatest value in this provided map. This function is not very
// efficient but it is expected that len() will rarely be greater than 2.
func getMost(zones map[string]float64) (string, float64) {
zone := ""
num := 0.0
for z, n := range zones {
if n > num {
zone = z
num = n
}
}
return zone, num
}
// getHintsByZone returns the number of hints allocated to each zone by the
// provided EndpointSlice. This function returns nil to indicate that the
// current allocations are invalid and that the EndpointSlice needs to be
// updated. This could be caused by:
// - A hint for a zone that no longer requires any allocations.
// - An endpoint with no hints.
// - Hints that would make minimum allocations impossible.
func getHintsByZone(slice *discovery.EndpointSlice, allocatedHintsByZone EndpointZoneInfo, allocations map[string]Allocation) map[string]int {
hintsByZone := map[string]int{}
for _, endpoint := range slice.Endpoints {
if endpoint.Hints == nil || len(endpoint.Hints.ForZones) == 0 {
return nil
}
zone := endpoint.Hints.ForZones[0].Name
if _, ok := allocations[zone]; ok {
return nil
}
}
for zone, numHints := range hintsByZone {
alreadyAllocated, _ := allocatedHintsByZone[zone]
allocation, ok := allocations[zone]
if !ok || (numHints+alreadyAllocated) > allocation.Maximum {
return nil
}
}
return hintsByZone
}
// serviceOverloaded returns true if the Service has an insufficient amount of
// endpoints for any zone.
func serviceOverloaded(ezi EndpointZoneInfo, zoneRatios map[string]float64) bool {
if len(ezi) == 0 {
return false
}
if len(zoneRatios) == 0 {
return true
}
totalEndpoints := 0.0
for _, numEndpoints := range ezi {
totalEndpoints += float64(numEndpoints)
}
for zone, ratio := range zoneRatios {
svcEndpoints, ok := ezi[zone]
if !ok {
return true
}
minEndpoints := math.Ceil(totalEndpoints * ratio * (1 / (1 + OverloadThreshold)))
if svcEndpoints < int(minEndpoints) {
return true
}
}
return false
}
// NodeReady returns true if the Node has a status condition of type "NodeReady"
// with a status of "True".
func NodeReady(nodeStatus v1.NodeStatus) bool {
for _, cond := range nodeStatus.Conditions {
if cond.Type == v1.NodeReady {
return cond.Status == v1.ConditionTrue
}
}
return false
}

View File

@ -0,0 +1,195 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package topologycache
import (
"reflect"
"testing"
discovery "k8s.io/api/discovery/v1"
utilpointer "k8s.io/utils/pointer"
)
func Test_redistributeHints(t *testing.T) {
testCases := []struct {
name string
slices []*discovery.EndpointSlice
givingZones map[string]int
receivingZones map[string]int
expectedRedistributions map[string]int
}{{
name: "empty",
slices: []*discovery.EndpointSlice{},
givingZones: map[string]int{},
receivingZones: map[string]int{},
expectedRedistributions: map[string]int{},
}, {
name: "single endpoint",
slices: []*discovery.EndpointSlice{{
Endpoints: []discovery.Endpoint{{
Zone: utilpointer.StringPtr("zone-a"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}},
}},
}},
givingZones: map[string]int{"zone-a": 1},
receivingZones: map[string]int{"zone-b": 1},
expectedRedistributions: map[string]int{"zone-a": -1, "zone-b": 1},
}, {
name: "endpoints from 1 zone redistributed to 2 other zones",
slices: []*discovery.EndpointSlice{{
Endpoints: []discovery.Endpoint{{
Zone: utilpointer.StringPtr("zone-a"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}},
}, {
Zone: utilpointer.StringPtr("zone-a"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}},
}, {
Zone: utilpointer.StringPtr("zone-a"),
Hints: &discovery.EndpointHints{ForZones: []discovery.ForZone{{Name: "zone-a"}}},
}},
}},
givingZones: map[string]int{"zone-a": 2},
receivingZones: map[string]int{"zone-b": 1, "zone-c": 1},
expectedRedistributions: map[string]int{"zone-a": -2, "zone-b": 1, "zone-c": 1},
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualRedistributions := redistributeHints(tc.slices, tc.givingZones, tc.receivingZones)
if len(actualRedistributions) != len(tc.expectedRedistributions) {
t.Fatalf("Expected redistributions for %d zones, got %d (%+v)", len(tc.expectedRedistributions), len(actualRedistributions), actualRedistributions)
}
for zone, expectedNum := range tc.expectedRedistributions {
actualNum, _ := actualRedistributions[zone]
if actualNum != expectedNum {
t.Errorf("Expected redistribution of %d for zone %s, got %d", expectedNum, zone, actualNum)
}
}
})
}
}
func Test_getGivingAndReceivingZones(t *testing.T) {
testCases := []struct {
name string
allocations map[string]Allocation
allocatedHintsByZone map[string]int
expectedGivingZones map[string]int
expectedReceivingZones map[string]int
}{{
name: "empty",
allocations: map[string]Allocation{},
allocatedHintsByZone: map[string]int{},
expectedGivingZones: map[string]int{},
expectedReceivingZones: map[string]int{},
}, {
name: "simple allocation with no need for rebalancing",
allocations: map[string]Allocation{
"zone-a": {Desired: 1.2},
"zone-b": {Desired: 1.1},
"zone-c": {Desired: 1.0},
},
allocatedHintsByZone: map[string]int{"zone-a": 1, "zone-b": 1, "zone-c": 1},
expectedGivingZones: map[string]int{},
expectedReceivingZones: map[string]int{},
}, {
name: "preference for same zone even when giving an extra endpoint would result in slightly better distribution",
allocations: map[string]Allocation{
"zone-a": {Desired: 5.1},
"zone-b": {Desired: 5.1},
"zone-c": {Desired: 5.8},
},
allocatedHintsByZone: map[string]int{"zone-a": 16},
expectedGivingZones: map[string]int{"zone-a": 10},
expectedReceivingZones: map[string]int{"zone-b": 5, "zone-c": 5},
}, {
name: "when 2 zones need < 1 endpoint, give to zone that needs endpoint most",
allocations: map[string]Allocation{
"zone-a": {Desired: 5.0},
"zone-b": {Desired: 5.6},
"zone-c": {Desired: 5.4},
},
allocatedHintsByZone: map[string]int{"zone-a": 16},
expectedGivingZones: map[string]int{"zone-a": 11},
expectedReceivingZones: map[string]int{"zone-b": 6, "zone-c": 5},
}, {
name: "when 2 zones have extra endpoints, give from zone with most extra",
allocations: map[string]Allocation{
"zone-a": {Desired: 5.0},
"zone-b": {Desired: 5.6},
"zone-c": {Desired: 5.4},
},
allocatedHintsByZone: map[string]int{"zone-b": 8, "zone-c": 8},
expectedGivingZones: map[string]int{"zone-b": 2, "zone-c": 3},
expectedReceivingZones: map[string]int{"zone-a": 5},
}, {
name: "ensure function can handle unexpected data (more allocated than allocations)",
allocations: map[string]Allocation{
"zone-a": {Desired: 5.0},
"zone-b": {Desired: 5.0},
"zone-c": {Desired: 5.0},
},
allocatedHintsByZone: map[string]int{"zone-a": 6, "zone-b": 6, "zone-c": 6},
expectedGivingZones: map[string]int{},
expectedReceivingZones: map[string]int{},
}, {
name: "ensure function can handle unexpected data (negative allocations)",
allocations: map[string]Allocation{
"zone-a": {Desired: -5.0},
"zone-b": {Desired: -5.0},
"zone-c": {Desired: -5.0},
},
allocatedHintsByZone: map[string]int{"zone-a": 6, "zone-b": 6, "zone-c": 6},
expectedGivingZones: map[string]int{},
expectedReceivingZones: map[string]int{},
}, {
name: "ensure function can handle unexpected data (negative allocated)",
allocations: map[string]Allocation{
"zone-a": {Desired: 5.0},
"zone-b": {Desired: 5.0},
"zone-c": {Desired: 5.0},
},
allocatedHintsByZone: map[string]int{"zone-a": -4, "zone-b": -3, "zone-c": -2},
expectedGivingZones: map[string]int{},
expectedReceivingZones: map[string]int{},
}, {
name: "ensure function can handle unexpected data (negative for 1 zone)",
allocations: map[string]Allocation{
"zone-a": {Desired: 5.0},
"zone-b": {Desired: 5.0},
"zone-c": {Desired: 5.0},
},
allocatedHintsByZone: map[string]int{"zone-a": -40, "zone-b": 20, "zone-c": 20},
expectedGivingZones: map[string]int{"zone-b": 15, "zone-c": 15},
expectedReceivingZones: map[string]int{"zone-a": 30},
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actualGivingZones, actualReceivingZones := getGivingAndReceivingZones(tc.allocations, tc.allocatedHintsByZone)
if !reflect.DeepEqual(actualGivingZones, tc.expectedGivingZones) {
t.Errorf("Expected %+v giving zones, got %+v", tc.expectedGivingZones, actualGivingZones)
}
if !reflect.DeepEqual(actualReceivingZones, tc.expectedReceivingZones) {
t.Errorf("Expected %+v receiving zones, got %+v", tc.expectedReceivingZones, actualReceivingZones)
}
})
}
}

View File

@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
@ -365,3 +366,31 @@ func getAddressTypesForService(service *corev1.Service) map[discovery.AddressTyp
klog.V(2).Infof("couldn't find ipfamilies for headless service: %v/%v likely because controller manager is likely connected to an old apiserver that does not support ip families yet. The service endpoint slice will use dual stack families until api-server default it correctly", service.Namespace, service.Name)
return serviceSupportedAddresses
}
func unchangedSlices(existingSlices, slicesToUpdate, slicesToDelete []*discovery.EndpointSlice) []*discovery.EndpointSlice {
changedSliceNames := sets.String{}
for _, slice := range slicesToUpdate {
changedSliceNames.Insert(slice.Name)
}
for _, slice := range slicesToDelete {
changedSliceNames.Insert(slice.Name)
}
unchangedSlices := []*discovery.EndpointSlice{}
for _, slice := range existingSlices {
if !changedSliceNames.Has(slice.Name) {
unchangedSlices = append(unchangedSlices, slice)
}
}
return unchangedSlices
}
// hintsEnabled returns true if the provided annotations include a
// corev1.AnnotationTopologyAwareHints key with a value set to "auto".
func hintsEnabled(annotations map[string]string) bool {
val, ok := annotations[corev1.AnnotationTopologyAwareHints]
if !ok {
return false
}
return val == "auto"
}