From aa32537e9a09ca0915321391feff06fcd34b3cd1 Mon Sep 17 00:00:00 2001 From: xigang Date: Sat, 1 Mar 2025 20:43:23 +0800 Subject: [PATCH] Add workqueue for node updates in DaemonSetController Signed-off-by: xigang --- pkg/controller/daemon/daemon_controller.go | 125 ++++++++++++++---- .../daemon/daemon_controller_test.go | 38 +++++- 2 files changed, 134 insertions(+), 29 deletions(-) diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 0076ca2411a..ca2c5d6ee6e 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -113,6 +113,8 @@ type DaemonSetsController struct { historyStoreSynced cache.InformerSynced // podLister get list/get pods from the shared informers's store podLister corelisters.PodLister + // podIndexer allows looking up pods by node name. + podIndexer cache.Indexer // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced cache.InformerSynced @@ -125,6 +127,10 @@ type DaemonSetsController struct { // DaemonSet keys that need to be synced. queue workqueue.TypedRateLimitingInterface[string] + // nodeUpdateQueue is a workqueue that processes node updates to ensure DaemonSets + // are properly reconciled when node properties change + nodeUpdateQueue workqueue.TypedRateLimitingInterface[string] + failedPodsBackoff *flowcontrol.Backoff } @@ -159,6 +165,12 @@ func NewDaemonSetsController( Name: "daemonset", }, ), + nodeUpdateQueue: workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[string](), + workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "daemonset-node-updates", + }, + ), } daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -204,6 +216,8 @@ func NewDaemonSetsController( }) dsc.podLister = podInformer.Lister() dsc.podStoreSynced = podInformer.Informer().HasSynced + controller.AddPodNodeNameIndexer(podInformer.Informer()) + dsc.podIndexer = podInformer.Informer().GetIndexer() nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -289,6 +303,7 @@ func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) { defer dsc.eventBroadcaster.Shutdown() defer dsc.queue.ShutDown() + defer dsc.nodeUpdateQueue.ShutDown() logger := klog.FromContext(ctx) logger.Info("Starting daemon sets controller") @@ -300,6 +315,7 @@ func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) { for i := 0; i < workers; i++ { go wait.UntilWithContext(ctx, dsc.runWorker, time.Second) + go wait.UntilWithContext(ctx, dsc.runNodeUpdateWorker, time.Second) } go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, ctx.Done()) @@ -643,18 +659,14 @@ func (dsc *DaemonSetsController) deletePod(logger klog.Logger, obj interface{}) } func (dsc *DaemonSetsController) addNode(logger klog.Logger, obj interface{}) { - // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too). - dsList, err := dsc.dsLister.List(labels.Everything()) - if err != nil { - logger.V(4).Info("Error enqueueing daemon sets", "err", err) + node, ok := obj.(*v1.Node) + if !ok { + utilruntime.HandleError(fmt.Errorf("couldn't get node from object %#v", obj)) return } - node := obj.(*v1.Node) - for _, ds := range dsList { - if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); shouldRun { - dsc.enqueueDaemonSet(ds) - } - } + + logger.V(4).Info("Queuing node addition", "node", klog.KObj(node)) + dsc.nodeUpdateQueue.Add(node.Name) } // shouldIgnoreNodeUpdate returns true if Node labels and taints have not changed, otherwise returns false. @@ -667,24 +679,13 @@ func shouldIgnoreNodeUpdate(oldNode, curNode v1.Node) bool { func (dsc *DaemonSetsController) updateNode(logger klog.Logger, old, cur interface{}) { oldNode := old.(*v1.Node) curNode := cur.(*v1.Node) + if shouldIgnoreNodeUpdate(*oldNode, *curNode) { return } - dsList, err := dsc.dsLister.List(labels.Everything()) - if err != nil { - logger.V(4).Info("Error listing daemon sets", "err", err) - return - } - // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too). - for _, ds := range dsList { - // If NodeShouldRunDaemonPod needs to uses other than Labels and Taints (mutable) properties of node, it needs to update shouldIgnoreNodeUpdate. - oldShouldRun, oldShouldContinueRunning := NodeShouldRunDaemonPod(oldNode, ds) - currentShouldRun, currentShouldContinueRunning := NodeShouldRunDaemonPod(curNode, ds) - if (oldShouldRun != currentShouldRun) || (oldShouldContinueRunning != currentShouldContinueRunning) { - dsc.enqueueDaemonSet(ds) - } - } + logger.V(4).Info("Queuing node update", "node", klog.KObj(curNode)) + dsc.nodeUpdateQueue.Add(curNode.Name) } // getDaemonPods returns daemon pods owned by the given ds. @@ -1376,3 +1377,79 @@ func getUnscheduledPodsWithoutNode(runningNodesList []*v1.Node, nodeToDaemonPods return results } + +// runNodeUpdateWorker is a worker that processes node updates from the nodeUpdateQueue. +func (dsc *DaemonSetsController) runNodeUpdateWorker(ctx context.Context) { + for dsc.processNextNodeUpdate(ctx) { + } +} + +func (dsc *DaemonSetsController) processNextNodeUpdate(ctx context.Context) bool { + nodeName, quit := dsc.nodeUpdateQueue.Get() + if quit { + return false + } + defer dsc.nodeUpdateQueue.Done(nodeName) + + err := dsc.syncNodeUpdate(ctx, nodeName) + if err == nil { + dsc.nodeUpdateQueue.Forget(nodeName) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with : %w", nodeName, err)) + dsc.nodeUpdateQueue.AddRateLimited(nodeName) + + return true +} + +func (dsc *DaemonSetsController) syncNodeUpdate(ctx context.Context, nodeName string) error { + logger := klog.FromContext(ctx) + + node, err := dsc.nodeLister.Get(nodeName) + if apierrors.IsNotFound(err) { + logger.V(3).Info("Node not found, skipping update", "node", nodeName) + return nil + } + if err != nil { + return fmt.Errorf("error getting node %s: %w", nodeName, err) + } + + dsList, err := dsc.dsLister.List(labels.Everything()) + if err != nil { + return fmt.Errorf("error listing daemon sets: %w", err) + } + + podsOnNode, err := dsc.podIndexer.ByIndex(controller.PodNodeNameKeyIndex, nodeName) + if err != nil { + return fmt.Errorf("error getting pods by node name: %w", err) + } + + podsByDS := make(map[string][]*v1.Pod) + for _, obj := range podsOnNode { + pod := obj.(*v1.Pod) + controllerRef := metav1.GetControllerOf(pod) + if controllerRef == nil || controllerRef.Kind != controllerKind.Kind { + continue + } + dsKey := cache.NewObjectName(pod.Namespace, controllerRef.Name).String() + podsByDS[dsKey] = append(podsByDS[dsKey], pod) + } + + for _, ds := range dsList { + shouldRun, shouldContinueRunning := NodeShouldRunDaemonPod(node, ds) + + dsKey, err := controller.KeyFunc(ds) + if err != nil { + return fmt.Errorf("error getting key for object %#v: %w", ds, err) + } + daemonPods := podsByDS[dsKey] + scheduled := len(daemonPods) > 0 + + if (shouldRun && !scheduled) || (!shouldContinueRunning && scheduled) { + dsc.enqueueDaemonSet(ds) + } + } + + return nil +} diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index 7aa832d2cb8..bf25f34d55d 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -2494,6 +2494,13 @@ func TestUpdateNode(t *testing.T) { t.Fatal(err) } + manager.nodeUpdateQueue = workqueue.NewTypedRateLimitingQueueWithConfig( + workqueue.DefaultTypedControllerRateLimiter[string](), + workqueue.TypedRateLimitingQueueConfig[string]{ + Name: "test-daemon-node-updates", + }, + ) + expectedEvents := 0 if c.expectedEventsFunc != nil { expectedEvents = c.expectedEventsFunc(strategy.Type) @@ -2510,8 +2517,18 @@ func TestUpdateNode(t *testing.T) { } } + err = manager.nodeStore.Add(c.newNode) + if err != nil { + t.Fatal(err) + } + enqueued = false manager.updateNode(logger, c.oldNode, c.newNode) + + nodeKeys := getQueuedKeys(manager.nodeUpdateQueue) + for _, key := range nodeKeys { + manager.syncNodeUpdate(ctx, key) + } if enqueued != c.shouldEnqueue { t.Errorf("Test case: '%s', expected: %t, got: %t", c.test, c.shouldEnqueue, enqueued) } @@ -2880,18 +2897,29 @@ func TestAddNode(t *testing.T) { t.Fatal(err) } manager.addNode(logger, node1) - if got, want := manager.queue.Len(), 0; got != want { + if got, want := manager.nodeUpdateQueue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } + key, done := manager.nodeUpdateQueue.Get() + if done { + t.Fatal("failed to get item from nodeUpdateQueue") + } + if key != node1.Name { + t.Fatalf("expected node name %v, got %v", node1.Name, key) + } + manager.nodeUpdateQueue.Done(key) node2 := newNode("node2", simpleNodeLabel) manager.addNode(logger, node2) - if got, want := manager.queue.Len(), 1; got != want { + if got, want := manager.nodeUpdateQueue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } - key, done := manager.queue.Get() - if key == "" || done { - t.Fatalf("failed to enqueue controller for node %v", node2.Name) + key, done = manager.nodeUpdateQueue.Get() + if done { + t.Fatal("failed to get item from nodeUpdateQueue") + } + if key != node2.Name { + t.Fatalf("expected node name %v, got %v", node2.Name, key) } }