Merge pull request #125294 from hbostan/master

Add a new workqueue to endpointslice controller for updating topology cache and checking node topology distribution.
This commit is contained in:
Kubernetes Prow Robot 2024-06-04 02:44:23 -07:00 committed by GitHub
commit 0122991a3c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 78 additions and 44 deletions

View File

@ -75,6 +75,9 @@ const (
// controllerName is a unique value used with LabelManagedBy to indicated
// the component managing an EndpointSlice.
controllerName = "endpointslice-controller.k8s.io"
// topologyQueueItemKey is the key for all items in the topologyQueue.
topologyQueueItemKey = "topologyQueueItemKey"
)
// NewController creates and initializes a new Controller
@ -99,7 +102,7 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
// such as an update to a Service or Deployment. A more significant
// rate limit back off here helps ensure that the Controller does not
// overwhelm the API Server.
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
serviceQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[string](defaultSyncBackOff, maxSyncBackOff),
// 10 qps, 100 bucket size. This is only for retry speed and its
@ -110,6 +113,9 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
Name: "endpoint_slice",
},
),
topologyQueue: workqueue.NewTypedRateLimitingQueue[string](
workqueue.DefaultTypedControllerRateLimiter[string](),
),
workerLoopPeriod: time.Second,
}
@ -158,14 +164,14 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer,
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) {
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.addNode(logger, obj)
AddFunc: func(_ interface{}) {
c.addNode()
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.updateNode(logger, oldObj, newObj)
c.updateNode(oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
c.deleteNode(logger, obj)
DeleteFunc: func(_ interface{}) {
c.deleteNode()
},
})
@ -236,7 +242,11 @@ type Controller struct {
// more often than services with few pods; it also would cause a
// service that's inserted multiple times to be processed more than
// necessary.
queue workqueue.TypedRateLimitingInterface[string]
serviceQueue workqueue.TypedRateLimitingInterface[string]
// topologyQueue is used to trigger a topology cache update and checking node
// topology distribution.
topologyQueue workqueue.TypedRateLimitingInterface[string]
// maxEndpointsPerSlice references the maximum number of endpoints that
// should be added to an EndpointSlice
@ -264,7 +274,8 @@ func (c *Controller) Run(ctx context.Context, workers int) {
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
defer c.eventBroadcaster.Shutdown()
defer c.queue.ShutDown()
defer c.serviceQueue.ShutDown()
defer c.topologyQueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting endpoint slice controller")
@ -274,29 +285,31 @@ func (c *Controller) Run(ctx context.Context, workers int) {
return
}
logger.V(2).Info("Starting worker threads", "total", workers)
logger.V(2).Info("Starting service queue worker threads", "total", workers)
for i := 0; i < workers; i++ {
go wait.Until(func() { c.worker(logger) }, c.workerLoopPeriod, ctx.Done())
go wait.Until(func() { c.serviceQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
}
logger.V(2).Info("Starting topology queue worker threads", "total", 1)
go wait.Until(func() { c.topologyQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done())
<-ctx.Done()
}
// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service
// at the same time
func (c *Controller) worker(logger klog.Logger) {
for c.processNextWorkItem(logger) {
// serviceQueueWorker runs a worker thread that just dequeues items, processes
// them, and marks them done. You may run as many of these in parallel as you
// wish; the workqueue guarantees that they will not end up processing the same
// service at the same time
func (c *Controller) serviceQueueWorker(logger klog.Logger) {
for c.processNextServiceWorkItem(logger) {
}
}
func (c *Controller) processNextWorkItem(logger klog.Logger) bool {
cKey, quit := c.queue.Get()
func (c *Controller) processNextServiceWorkItem(logger klog.Logger) bool {
cKey, quit := c.serviceQueue.Get()
if quit {
return false
}
defer c.queue.Done(cKey)
defer c.serviceQueue.Done(cKey)
err := c.syncService(logger, cKey)
c.handleErr(logger, err, cKey)
@ -304,22 +317,37 @@ func (c *Controller) processNextWorkItem(logger klog.Logger) bool {
return true
}
func (c *Controller) topologyQueueWorker(logger klog.Logger) {
for c.processNextTopologyWorkItem(logger) {
}
}
func (c *Controller) processNextTopologyWorkItem(logger klog.Logger) bool {
key, quit := c.topologyQueue.Get()
if quit {
return false
}
defer c.topologyQueue.Done(key)
c.checkNodeTopologyDistribution(logger)
return true
}
func (c *Controller) handleErr(logger klog.Logger, err error, key string) {
trackSync(err)
if err == nil {
c.queue.Forget(key)
c.serviceQueue.Forget(key)
return
}
if c.queue.NumRequeues(key) < maxRetries {
if c.serviceQueue.NumRequeues(key) < maxRetries {
logger.Info("Error syncing endpoint slices for service, retrying", "key", key, "err", err)
c.queue.AddRateLimited(key)
c.serviceQueue.AddRateLimited(key)
return
}
logger.Info("Retry budget exceeded, dropping service out of the queue", "key", key, "err", err)
c.queue.Forget(key)
c.serviceQueue.Forget(key)
utilruntime.HandleError(err)
}
@ -416,7 +444,7 @@ func (c *Controller) onServiceUpdate(obj interface{}) {
return
}
c.queue.Add(key)
c.serviceQueue.Add(key)
}
// onServiceDelete removes the Service Selector from the cache and queues the Service for processing.
@ -427,7 +455,7 @@ func (c *Controller) onServiceDelete(obj interface{}) {
return
}
c.queue.Add(key)
c.serviceQueue.Add(key)
}
// onEndpointSliceAdd queues a sync for the relevant Service for a sync if the
@ -500,7 +528,7 @@ func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.Endpo
if c.endpointUpdatesBatchPeriod > delay {
delay = c.endpointUpdatesBatchPeriod
}
c.queue.AddAfter(key, delay)
c.serviceQueue.AddAfter(key, delay)
}
func (c *Controller) addPod(obj interface{}) {
@ -511,14 +539,14 @@ func (c *Controller) addPod(obj interface{}) {
return
}
for key := range services {
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
c.serviceQueue.AddAfter(key, c.endpointUpdatesBatchPeriod)
}
}
func (c *Controller) updatePod(old, cur interface{}) {
services := endpointsliceutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur)
for key := range services {
c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod)
c.serviceQueue.AddAfter(key, c.endpointUpdatesBatchPeriod)
}
}
@ -531,11 +559,11 @@ func (c *Controller) deletePod(obj interface{}) {
}
}
func (c *Controller) addNode(logger klog.Logger, obj interface{}) {
c.checkNodeTopologyDistribution(logger)
func (c *Controller) addNode() {
c.topologyQueue.Add(topologyQueueItemKey)
}
func (c *Controller) updateNode(logger klog.Logger, old, cur interface{}) {
func (c *Controller) updateNode(old, cur interface{}) {
oldNode := old.(*v1.Node)
curNode := cur.(*v1.Node)
@ -543,12 +571,12 @@ func (c *Controller) updateNode(logger klog.Logger, old, cur interface{}) {
// The topology cache should be updated in this case.
if isNodeReady(oldNode) != isNodeReady(curNode) ||
oldNode.Labels[v1.LabelTopologyZone] != curNode.Labels[v1.LabelTopologyZone] {
c.checkNodeTopologyDistribution(logger)
c.topologyQueue.Add(topologyQueueItemKey)
}
}
func (c *Controller) deleteNode(logger klog.Logger, obj interface{}) {
c.checkNodeTopologyDistribution(logger)
func (c *Controller) deleteNode() {
c.topologyQueue.Add(topologyQueueItemKey)
}
// checkNodeTopologyDistribution updates Nodes in the topology cache and then
@ -566,7 +594,7 @@ func (c *Controller) checkNodeTopologyDistribution(logger klog.Logger) {
serviceKeys := c.topologyCache.GetOverloadedServices()
for _, serviceKey := range serviceKeys {
logger.V(2).Info("Queuing Service after Node change due to overloading", "key", serviceKey)
c.queue.Add(serviceKey)
c.serviceQueue.Add(serviceKey)
}
}

View File

@ -539,10 +539,10 @@ func TestOnEndpointSliceUpdate(t *testing.T) {
epSlice2 := epSlice1.DeepCopy()
epSlice2.Labels[discovery.LabelManagedBy] = "something else"
assert.Equal(t, 0, esController.queue.Len())
assert.Equal(t, 0, esController.serviceQueue.Len())
esController.onEndpointSliceUpdate(logger, epSlice1, epSlice2)
err := wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (bool, error) {
if esController.queue.Len() > 0 {
if esController.serviceQueue.Len() > 0 {
return true, nil
}
return false, nil
@ -550,7 +550,7 @@ func TestOnEndpointSliceUpdate(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error waiting for add to queue")
}
assert.Equal(t, 1, esController.queue.Len())
assert.Equal(t, 1, esController.serviceQueue.Len())
}
func TestSyncService(t *testing.T) {
@ -1947,8 +1947,8 @@ func Test_checkNodeTopologyDistribution(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
esController.checkNodeTopologyDistribution(logger)
if esController.queue.Len() != tc.expectedQueueLen {
t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.queue.Len())
if esController.serviceQueue.Len() != tc.expectedQueueLen {
t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.serviceQueue.Len())
}
})
}
@ -2007,8 +2007,11 @@ func TestUpdateNode(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
esController.nodeStore.Add(node1)
esController.nodeStore.Add(node2)
esController.addNode(logger, node1)
esController.addNode(logger, node2)
esController.addNode()
esController.addNode()
assert.Equal(t, 1, esController.topologyQueue.Len())
esController.processNextTopologyWorkItem(logger)
assert.Equal(t, 0, esController.topologyQueue.Len())
// The Nodes don't have the zone label, AddHints should fail.
_, _, eventsBuilders := esController.topologyCache.AddHints(logger, sliceInfo)
require.Len(t, eventsBuilders, 1)
@ -2022,8 +2025,11 @@ func TestUpdateNode(t *testing.T) {
// After adding the zone label to the Nodes and calling the event handler updateNode, AddHints should succeed.
esController.nodeStore.Update(updateNode1)
esController.nodeStore.Update(updateNode2)
esController.updateNode(logger, node1, updateNode1)
esController.updateNode(logger, node2, updateNode2)
esController.updateNode(node1, updateNode1)
esController.updateNode(node2, updateNode2)
assert.Equal(t, 1, esController.topologyQueue.Len())
esController.processNextTopologyWorkItem(logger)
assert.Equal(t, 0, esController.topologyQueue.Len())
_, _, eventsBuilders = esController.topologyCache.AddHints(logger, sliceInfo)
require.Len(t, eventsBuilders, 1)
assert.Contains(t, eventsBuilders[0].Message, topologycache.TopologyAwareHintsEnabled)