Add workqueue for node updates in DaemonSetController

Signed-off-by: xigang <wangxigang2014@gmail.com>
This commit is contained in:
xigang 2025-03-01 20:43:23 +08:00
parent 473ec01548
commit aa32537e9a
2 changed files with 134 additions and 29 deletions

View File

@ -113,6 +113,8 @@ type DaemonSetsController struct {
historyStoreSynced cache.InformerSynced
// podLister get list/get pods from the shared informers's store
podLister corelisters.PodLister
// podIndexer allows looking up pods by node name.
podIndexer cache.Indexer
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced cache.InformerSynced
@ -125,6 +127,10 @@ type DaemonSetsController struct {
// DaemonSet keys that need to be synced.
queue workqueue.TypedRateLimitingInterface[string]
// nodeUpdateQueue is a workqueue that processes node updates to ensure DaemonSets
// are properly reconciled when node properties change
nodeUpdateQueue workqueue.TypedRateLimitingInterface[string]
failedPodsBackoff *flowcontrol.Backoff
}
@ -159,6 +165,12 @@ func NewDaemonSetsController(
Name: "daemonset",
},
),
nodeUpdateQueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "daemonset-node-updates",
},
),
}
daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -204,6 +216,8 @@ func NewDaemonSetsController(
})
dsc.podLister = podInformer.Lister()
dsc.podStoreSynced = podInformer.Informer().HasSynced
controller.AddPodNodeNameIndexer(podInformer.Informer())
dsc.podIndexer = podInformer.Informer().GetIndexer()
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@ -289,6 +303,7 @@ func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
defer dsc.eventBroadcaster.Shutdown()
defer dsc.queue.ShutDown()
defer dsc.nodeUpdateQueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting daemon sets controller")
@ -300,6 +315,7 @@ func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, dsc.runWorker, time.Second)
go wait.UntilWithContext(ctx, dsc.runNodeUpdateWorker, time.Second)
}
go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, ctx.Done())
@ -643,18 +659,14 @@ func (dsc *DaemonSetsController) deletePod(logger klog.Logger, obj interface{})
}
func (dsc *DaemonSetsController) addNode(logger klog.Logger, obj interface{}) {
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
dsList, err := dsc.dsLister.List(labels.Everything())
if err != nil {
logger.V(4).Info("Error enqueueing daemon sets", "err", err)
node, ok := obj.(*v1.Node)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get node from object %#v", obj))
return
}
node := obj.(*v1.Node)
for _, ds := range dsList {
if shouldRun, _ := NodeShouldRunDaemonPod(node, ds); shouldRun {
dsc.enqueueDaemonSet(ds)
}
}
logger.V(4).Info("Queuing node addition", "node", klog.KObj(node))
dsc.nodeUpdateQueue.Add(node.Name)
}
// shouldIgnoreNodeUpdate returns true if Node labels and taints have not changed, otherwise returns false.
@ -667,24 +679,13 @@ func shouldIgnoreNodeUpdate(oldNode, curNode v1.Node) bool {
func (dsc *DaemonSetsController) updateNode(logger klog.Logger, old, cur interface{}) {
oldNode := old.(*v1.Node)
curNode := cur.(*v1.Node)
if shouldIgnoreNodeUpdate(*oldNode, *curNode) {
return
}
dsList, err := dsc.dsLister.List(labels.Everything())
if err != nil {
logger.V(4).Info("Error listing daemon sets", "err", err)
return
}
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
for _, ds := range dsList {
// If NodeShouldRunDaemonPod needs to uses other than Labels and Taints (mutable) properties of node, it needs to update shouldIgnoreNodeUpdate.
oldShouldRun, oldShouldContinueRunning := NodeShouldRunDaemonPod(oldNode, ds)
currentShouldRun, currentShouldContinueRunning := NodeShouldRunDaemonPod(curNode, ds)
if (oldShouldRun != currentShouldRun) || (oldShouldContinueRunning != currentShouldContinueRunning) {
dsc.enqueueDaemonSet(ds)
}
}
logger.V(4).Info("Queuing node update", "node", klog.KObj(curNode))
dsc.nodeUpdateQueue.Add(curNode.Name)
}
// getDaemonPods returns daemon pods owned by the given ds.
@ -1376,3 +1377,79 @@ func getUnscheduledPodsWithoutNode(runningNodesList []*v1.Node, nodeToDaemonPods
return results
}
// runNodeUpdateWorker is a worker that processes node updates from the nodeUpdateQueue.
func (dsc *DaemonSetsController) runNodeUpdateWorker(ctx context.Context) {
for dsc.processNextNodeUpdate(ctx) {
}
}
func (dsc *DaemonSetsController) processNextNodeUpdate(ctx context.Context) bool {
nodeName, quit := dsc.nodeUpdateQueue.Get()
if quit {
return false
}
defer dsc.nodeUpdateQueue.Done(nodeName)
err := dsc.syncNodeUpdate(ctx, nodeName)
if err == nil {
dsc.nodeUpdateQueue.Forget(nodeName)
return true
}
utilruntime.HandleError(fmt.Errorf("%v failed with : %w", nodeName, err))
dsc.nodeUpdateQueue.AddRateLimited(nodeName)
return true
}
func (dsc *DaemonSetsController) syncNodeUpdate(ctx context.Context, nodeName string) error {
logger := klog.FromContext(ctx)
node, err := dsc.nodeLister.Get(nodeName)
if apierrors.IsNotFound(err) {
logger.V(3).Info("Node not found, skipping update", "node", nodeName)
return nil
}
if err != nil {
return fmt.Errorf("error getting node %s: %w", nodeName, err)
}
dsList, err := dsc.dsLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("error listing daemon sets: %w", err)
}
podsOnNode, err := dsc.podIndexer.ByIndex(controller.PodNodeNameKeyIndex, nodeName)
if err != nil {
return fmt.Errorf("error getting pods by node name: %w", err)
}
podsByDS := make(map[string][]*v1.Pod)
for _, obj := range podsOnNode {
pod := obj.(*v1.Pod)
controllerRef := metav1.GetControllerOf(pod)
if controllerRef == nil || controllerRef.Kind != controllerKind.Kind {
continue
}
dsKey := cache.NewObjectName(pod.Namespace, controllerRef.Name).String()
podsByDS[dsKey] = append(podsByDS[dsKey], pod)
}
for _, ds := range dsList {
shouldRun, shouldContinueRunning := NodeShouldRunDaemonPod(node, ds)
dsKey, err := controller.KeyFunc(ds)
if err != nil {
return fmt.Errorf("error getting key for object %#v: %w", ds, err)
}
daemonPods := podsByDS[dsKey]
scheduled := len(daemonPods) > 0
if (shouldRun && !scheduled) || (!shouldContinueRunning && scheduled) {
dsc.enqueueDaemonSet(ds)
}
}
return nil
}

View File

@ -2494,6 +2494,13 @@ func TestUpdateNode(t *testing.T) {
t.Fatal(err)
}
manager.nodeUpdateQueue = workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{
Name: "test-daemon-node-updates",
},
)
expectedEvents := 0
if c.expectedEventsFunc != nil {
expectedEvents = c.expectedEventsFunc(strategy.Type)
@ -2510,8 +2517,18 @@ func TestUpdateNode(t *testing.T) {
}
}
err = manager.nodeStore.Add(c.newNode)
if err != nil {
t.Fatal(err)
}
enqueued = false
manager.updateNode(logger, c.oldNode, c.newNode)
nodeKeys := getQueuedKeys(manager.nodeUpdateQueue)
for _, key := range nodeKeys {
manager.syncNodeUpdate(ctx, key)
}
if enqueued != c.shouldEnqueue {
t.Errorf("Test case: '%s', expected: %t, got: %t", c.test, c.shouldEnqueue, enqueued)
}
@ -2880,18 +2897,29 @@ func TestAddNode(t *testing.T) {
t.Fatal(err)
}
manager.addNode(logger, node1)
if got, want := manager.queue.Len(), 0; got != want {
if got, want := manager.nodeUpdateQueue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := manager.nodeUpdateQueue.Get()
if done {
t.Fatal("failed to get item from nodeUpdateQueue")
}
if key != node1.Name {
t.Fatalf("expected node name %v, got %v", node1.Name, key)
}
manager.nodeUpdateQueue.Done(key)
node2 := newNode("node2", simpleNodeLabel)
manager.addNode(logger, node2)
if got, want := manager.queue.Len(), 1; got != want {
if got, want := manager.nodeUpdateQueue.Len(), 1; got != want {
t.Fatalf("queue.Len() = %v, want %v", got, want)
}
key, done := manager.queue.Get()
if key == "" || done {
t.Fatalf("failed to enqueue controller for node %v", node2.Name)
key, done = manager.nodeUpdateQueue.Get()
if done {
t.Fatal("failed to get item from nodeUpdateQueue")
}
if key != node2.Name {
t.Fatalf("expected node name %v, got %v", node2.Name, key)
}
}