From db827e67fc11621cad04c7bbbb11822500f0108a Mon Sep 17 00:00:00 2001 From: hbostan Date: Mon, 3 Jun 2024 12:41:37 +0000 Subject: [PATCH] Add a new workqueue to endpointslice controller for updating topology cache and checking node topology distribution. --- .../endpointslice/endpointslice_controller.go | 98 ++++++++++++------- .../endpointslice_controller_test.go | 24 +++-- 2 files changed, 78 insertions(+), 44 deletions(-) diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index 9fd569f5261..b0b67521b0e 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -75,6 +75,9 @@ const ( // controllerName is a unique value used with LabelManagedBy to indicated // the component managing an EndpointSlice. controllerName = "endpointslice-controller.k8s.io" + + // topologyQueueItemKey is the key for all items in the topologyQueue. + topologyQueueItemKey = "topologyQueueItemKey" ) // NewController creates and initializes a new Controller @@ -99,7 +102,7 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer, // such as an update to a Service or Deployment. A more significant // rate limit back off here helps ensure that the Controller does not // overwhelm the API Server. - queue: workqueue.NewTypedRateLimitingQueueWithConfig( + serviceQueue: workqueue.NewTypedRateLimitingQueueWithConfig( workqueue.NewTypedMaxOfRateLimiter( workqueue.NewTypedItemExponentialFailureRateLimiter[string](defaultSyncBackOff, maxSyncBackOff), // 10 qps, 100 bucket size. This is only for retry speed and its @@ -110,6 +113,9 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer, Name: "endpoint_slice", }, ), + topologyQueue: workqueue.NewTypedRateLimitingQueue[string]( + workqueue.DefaultTypedControllerRateLimiter[string](), + ), workerLoopPeriod: time.Second, } @@ -158,14 +164,14 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer, if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - c.addNode(logger, obj) + AddFunc: func(_ interface{}) { + c.addNode() }, UpdateFunc: func(oldObj, newObj interface{}) { - c.updateNode(logger, oldObj, newObj) + c.updateNode(oldObj, newObj) }, - DeleteFunc: func(obj interface{}) { - c.deleteNode(logger, obj) + DeleteFunc: func(_ interface{}) { + c.deleteNode() }, }) @@ -236,7 +242,11 @@ type Controller struct { // more often than services with few pods; it also would cause a // service that's inserted multiple times to be processed more than // necessary. - queue workqueue.TypedRateLimitingInterface[string] + serviceQueue workqueue.TypedRateLimitingInterface[string] + + // topologyQueue is used to trigger a topology cache update and checking node + // topology distribution. + topologyQueue workqueue.TypedRateLimitingInterface[string] // maxEndpointsPerSlice references the maximum number of endpoints that // should be added to an EndpointSlice @@ -264,7 +274,8 @@ func (c *Controller) Run(ctx context.Context, workers int) { c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")}) defer c.eventBroadcaster.Shutdown() - defer c.queue.ShutDown() + defer c.serviceQueue.ShutDown() + defer c.topologyQueue.ShutDown() logger := klog.FromContext(ctx) logger.Info("Starting endpoint slice controller") @@ -274,29 +285,31 @@ func (c *Controller) Run(ctx context.Context, workers int) { return } - logger.V(2).Info("Starting worker threads", "total", workers) + logger.V(2).Info("Starting service queue worker threads", "total", workers) for i := 0; i < workers; i++ { - go wait.Until(func() { c.worker(logger) }, c.workerLoopPeriod, ctx.Done()) + go wait.Until(func() { c.serviceQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done()) } + logger.V(2).Info("Starting topology queue worker threads", "total", 1) + go wait.Until(func() { c.topologyQueueWorker(logger) }, c.workerLoopPeriod, ctx.Done()) <-ctx.Done() } -// worker runs a worker thread that just dequeues items, processes them, and -// marks them done. You may run as many of these in parallel as you wish; the -// workqueue guarantees that they will not end up processing the same service -// at the same time -func (c *Controller) worker(logger klog.Logger) { - for c.processNextWorkItem(logger) { +// serviceQueueWorker runs a worker thread that just dequeues items, processes +// them, and marks them done. You may run as many of these in parallel as you +// wish; the workqueue guarantees that they will not end up processing the same +// service at the same time +func (c *Controller) serviceQueueWorker(logger klog.Logger) { + for c.processNextServiceWorkItem(logger) { } } -func (c *Controller) processNextWorkItem(logger klog.Logger) bool { - cKey, quit := c.queue.Get() +func (c *Controller) processNextServiceWorkItem(logger klog.Logger) bool { + cKey, quit := c.serviceQueue.Get() if quit { return false } - defer c.queue.Done(cKey) + defer c.serviceQueue.Done(cKey) err := c.syncService(logger, cKey) c.handleErr(logger, err, cKey) @@ -304,22 +317,37 @@ func (c *Controller) processNextWorkItem(logger klog.Logger) bool { return true } +func (c *Controller) topologyQueueWorker(logger klog.Logger) { + for c.processNextTopologyWorkItem(logger) { + } +} + +func (c *Controller) processNextTopologyWorkItem(logger klog.Logger) bool { + key, quit := c.topologyQueue.Get() + if quit { + return false + } + defer c.topologyQueue.Done(key) + c.checkNodeTopologyDistribution(logger) + return true +} + func (c *Controller) handleErr(logger klog.Logger, err error, key string) { trackSync(err) if err == nil { - c.queue.Forget(key) + c.serviceQueue.Forget(key) return } - if c.queue.NumRequeues(key) < maxRetries { + if c.serviceQueue.NumRequeues(key) < maxRetries { logger.Info("Error syncing endpoint slices for service, retrying", "key", key, "err", err) - c.queue.AddRateLimited(key) + c.serviceQueue.AddRateLimited(key) return } logger.Info("Retry budget exceeded, dropping service out of the queue", "key", key, "err", err) - c.queue.Forget(key) + c.serviceQueue.Forget(key) utilruntime.HandleError(err) } @@ -416,7 +444,7 @@ func (c *Controller) onServiceUpdate(obj interface{}) { return } - c.queue.Add(key) + c.serviceQueue.Add(key) } // onServiceDelete removes the Service Selector from the cache and queues the Service for processing. @@ -427,7 +455,7 @@ func (c *Controller) onServiceDelete(obj interface{}) { return } - c.queue.Add(key) + c.serviceQueue.Add(key) } // onEndpointSliceAdd queues a sync for the relevant Service for a sync if the @@ -500,7 +528,7 @@ func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.Endpo if c.endpointUpdatesBatchPeriod > delay { delay = c.endpointUpdatesBatchPeriod } - c.queue.AddAfter(key, delay) + c.serviceQueue.AddAfter(key, delay) } func (c *Controller) addPod(obj interface{}) { @@ -511,14 +539,14 @@ func (c *Controller) addPod(obj interface{}) { return } for key := range services { - c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod) + c.serviceQueue.AddAfter(key, c.endpointUpdatesBatchPeriod) } } func (c *Controller) updatePod(old, cur interface{}) { services := endpointsliceutil.GetServicesToUpdateOnPodChange(c.serviceLister, old, cur) for key := range services { - c.queue.AddAfter(key, c.endpointUpdatesBatchPeriod) + c.serviceQueue.AddAfter(key, c.endpointUpdatesBatchPeriod) } } @@ -531,11 +559,11 @@ func (c *Controller) deletePod(obj interface{}) { } } -func (c *Controller) addNode(logger klog.Logger, obj interface{}) { - c.checkNodeTopologyDistribution(logger) +func (c *Controller) addNode() { + c.topologyQueue.Add(topologyQueueItemKey) } -func (c *Controller) updateNode(logger klog.Logger, old, cur interface{}) { +func (c *Controller) updateNode(old, cur interface{}) { oldNode := old.(*v1.Node) curNode := cur.(*v1.Node) @@ -543,12 +571,12 @@ func (c *Controller) updateNode(logger klog.Logger, old, cur interface{}) { // The topology cache should be updated in this case. if isNodeReady(oldNode) != isNodeReady(curNode) || oldNode.Labels[v1.LabelTopologyZone] != curNode.Labels[v1.LabelTopologyZone] { - c.checkNodeTopologyDistribution(logger) + c.topologyQueue.Add(topologyQueueItemKey) } } -func (c *Controller) deleteNode(logger klog.Logger, obj interface{}) { - c.checkNodeTopologyDistribution(logger) +func (c *Controller) deleteNode() { + c.topologyQueue.Add(topologyQueueItemKey) } // checkNodeTopologyDistribution updates Nodes in the topology cache and then @@ -566,7 +594,7 @@ func (c *Controller) checkNodeTopologyDistribution(logger klog.Logger) { serviceKeys := c.topologyCache.GetOverloadedServices() for _, serviceKey := range serviceKeys { logger.V(2).Info("Queuing Service after Node change due to overloading", "key", serviceKey) - c.queue.Add(serviceKey) + c.serviceQueue.Add(serviceKey) } } diff --git a/pkg/controller/endpointslice/endpointslice_controller_test.go b/pkg/controller/endpointslice/endpointslice_controller_test.go index 2b0908a5509..a6a2a22cd0d 100644 --- a/pkg/controller/endpointslice/endpointslice_controller_test.go +++ b/pkg/controller/endpointslice/endpointslice_controller_test.go @@ -539,10 +539,10 @@ func TestOnEndpointSliceUpdate(t *testing.T) { epSlice2 := epSlice1.DeepCopy() epSlice2.Labels[discovery.LabelManagedBy] = "something else" - assert.Equal(t, 0, esController.queue.Len()) + assert.Equal(t, 0, esController.serviceQueue.Len()) esController.onEndpointSliceUpdate(logger, epSlice1, epSlice2) err := wait.PollImmediate(100*time.Millisecond, 3*time.Second, func() (bool, error) { - if esController.queue.Len() > 0 { + if esController.serviceQueue.Len() > 0 { return true, nil } return false, nil @@ -550,7 +550,7 @@ func TestOnEndpointSliceUpdate(t *testing.T) { if err != nil { t.Fatalf("unexpected error waiting for add to queue") } - assert.Equal(t, 1, esController.queue.Len()) + assert.Equal(t, 1, esController.serviceQueue.Len()) } func TestSyncService(t *testing.T) { @@ -1947,8 +1947,8 @@ func Test_checkNodeTopologyDistribution(t *testing.T) { logger, _ := ktesting.NewTestContext(t) esController.checkNodeTopologyDistribution(logger) - if esController.queue.Len() != tc.expectedQueueLen { - t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.queue.Len()) + if esController.serviceQueue.Len() != tc.expectedQueueLen { + t.Errorf("Expected %d services to be queued, got %d", tc.expectedQueueLen, esController.serviceQueue.Len()) } }) } @@ -2007,8 +2007,11 @@ func TestUpdateNode(t *testing.T) { logger, _ := ktesting.NewTestContext(t) esController.nodeStore.Add(node1) esController.nodeStore.Add(node2) - esController.addNode(logger, node1) - esController.addNode(logger, node2) + esController.addNode() + esController.addNode() + assert.Equal(t, 1, esController.topologyQueue.Len()) + esController.processNextTopologyWorkItem(logger) + assert.Equal(t, 0, esController.topologyQueue.Len()) // The Nodes don't have the zone label, AddHints should fail. _, _, eventsBuilders := esController.topologyCache.AddHints(logger, sliceInfo) require.Len(t, eventsBuilders, 1) @@ -2022,8 +2025,11 @@ func TestUpdateNode(t *testing.T) { // After adding the zone label to the Nodes and calling the event handler updateNode, AddHints should succeed. esController.nodeStore.Update(updateNode1) esController.nodeStore.Update(updateNode2) - esController.updateNode(logger, node1, updateNode1) - esController.updateNode(logger, node2, updateNode2) + esController.updateNode(node1, updateNode1) + esController.updateNode(node2, updateNode2) + assert.Equal(t, 1, esController.topologyQueue.Len()) + esController.processNextTopologyWorkItem(logger) + assert.Equal(t, 0, esController.topologyQueue.Len()) _, _, eventsBuilders = esController.topologyCache.AddHints(logger, sliceInfo) require.Len(t, eventsBuilders, 1) assert.Contains(t, eventsBuilders[0].Message, topologycache.TopologyAwareHintsEnabled)