From 4ec8cf08dadabbe3bce49831bfc9405dc785c018 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Thu, 14 Jul 2022 18:00:44 +0200 Subject: [PATCH] This PR refactors taint_manager to eliminate the getPod and getNode stubs. --- .../node_lifecycle_controller.go | 7 +- .../nodelifecycle/scheduler/taint_manager.go | 21 ++-- .../scheduler/taint_manager_test.go | 103 ++++++------------ 3 files changed, 45 insertions(+), 86 deletions(-) diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index 4394f45ff80..fa3ab99d923 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -475,12 +475,10 @@ func NewNodeLifecycleController( return pods, nil } nc.podLister = podInformer.Lister() + nc.nodeLister = nodeInformer.Lister() if nc.runTaintManager { - podGetter := func(name, namespace string) (*v1.Pod, error) { return nc.podLister.Pods(namespace).Get(name) } - nodeLister := nodeInformer.Lister() - nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) } - nc.taintManager = scheduler.NewNoExecuteTaintManager(ctx, kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode) + nc.taintManager = scheduler.NewNoExecuteTaintManager(ctx, kubeClient, nc.podLister, nc.nodeLister, nc.getPodsAssignedToNode) nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error { nc.taintManager.NodeUpdated(nil, node) @@ -518,7 +516,6 @@ func NewNodeLifecycleController( nc.leaseLister = leaseInformer.Lister() nc.leaseInformerSynced = leaseInformer.Informer().HasSynced - nc.nodeLister = nodeInformer.Lister() nc.nodeInformerSynced = nodeInformer.Informer().HasSynced nc.daemonSetStore = daemonSetInformer.Lister() diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager.go b/pkg/controller/nodelifecycle/scheduler/taint_manager.go index 750a9390819..243a1a52eff 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager.go @@ -33,6 +33,7 @@ import ( clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/apis/core/helper" @@ -69,12 +70,6 @@ func hash(val string, max int) int { return int(hasher.Sum32() % uint32(max)) } -// GetPodFunc returns the pod for the specified name/namespace, or a NotFound error if missing. -type GetPodFunc func(name, namespace string) (*v1.Pod, error) - -// GetNodeFunc returns the node for the specified name, or a NotFound error if missing. -type GetNodeFunc func(name string) (*v1.Node, error) - // GetPodsByNodeNameFunc returns the list of pods assigned to the specified node. type GetPodsByNodeNameFunc func(nodeName string) ([]*v1.Pod, error) @@ -84,8 +79,8 @@ type NoExecuteTaintManager struct { client clientset.Interface broadcaster record.EventBroadcaster recorder record.EventRecorder - getPod GetPodFunc - getNode GetNodeFunc + podLister corelisters.PodLister + nodeLister corelisters.NodeLister getPodsAssignedToNode GetPodsByNodeNameFunc taintEvictionQueue *TimedWorkerQueue @@ -156,7 +151,7 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration { // NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to // communicate with the API server. -func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager { +func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, podLister corelisters.PodLister, nodeLister corelisters.NodeLister, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"}) @@ -164,8 +159,8 @@ func NewNoExecuteTaintManager(ctx context.Context, c clientset.Interface, getPod client: c, broadcaster: eventBroadcaster, recorder: recorder, - getPod: getPod, - getNode: getNode, + podLister: podLister, + nodeLister: nodeLister, getPodsAssignedToNode: getPodsAssignedToNode, taintedNodes: make(map[string][]v1.Taint), @@ -388,7 +383,7 @@ func (tc *NoExecuteTaintManager) processPodOnNode( } func (tc *NoExecuteTaintManager) handlePodUpdate(ctx context.Context, podUpdate podUpdateItem) { - pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace) + pod, err := tc.podLister.Pods(podUpdate.podNamespace).Get(podUpdate.podName) if err != nil { if apierrors.IsNotFound(err) { // Delete @@ -428,7 +423,7 @@ func (tc *NoExecuteTaintManager) handlePodUpdate(ctx context.Context, podUpdate } func (tc *NoExecuteTaintManager) handleNodeUpdate(ctx context.Context, nodeUpdate nodeUpdateItem) { - node, err := tc.getNode(nodeUpdate.nodeName) + node, err := tc.nodeLister.Get(nodeUpdate.nodeName) if err != nil { if apierrors.IsNotFound(err) { // Delete diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go b/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go index b67d6dc6226..71803915090 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "sort" - "sync" "testing" "time" @@ -28,21 +27,17 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + clienttesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/controller/testutil" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - clienttesting "k8s.io/client-go/testing" ) var timeForControllerToProgress = 500 * time.Millisecond -func getPodFromClientset(clientset *fake.Clientset) GetPodFunc { - return func(name, namespace string) (*v1.Pod, error) { - return clientset.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) - } -} - func getPodsAssignedToNode(c *fake.Clientset) GetPodsByNodeNameFunc { return func(nodeName string) ([]*v1.Pod, error) { selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}) @@ -61,46 +56,6 @@ func getPodsAssignedToNode(c *fake.Clientset) GetPodsByNodeNameFunc { } } -func getNodeFromClientset(clientset *fake.Clientset) GetNodeFunc { - return func(name string) (*v1.Node, error) { - return clientset.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{}) - } -} - -type podHolder struct { - pod *v1.Pod - sync.Mutex -} - -func (p *podHolder) getPod(name, namespace string) (*v1.Pod, error) { - p.Lock() - defer p.Unlock() - return p.pod, nil -} -func (p *podHolder) setPod(pod *v1.Pod) { - p.Lock() - defer p.Unlock() - p.pod = pod -} - -type nodeHolder struct { - lock sync.Mutex - - node *v1.Node -} - -func (n *nodeHolder) setNode(node *v1.Node) { - n.lock.Lock() - defer n.lock.Unlock() - n.node = node -} - -func (n *nodeHolder) getNode(name string) (*v1.Node, error) { - n.lock.Lock() - defer n.lock.Unlock() - return n.node, nil -} - func createNoExecuteTaint(index int) v1.Taint { now := metav1.Now() return v1.Taint{ @@ -133,6 +88,14 @@ func addTaintsToNode(node *v1.Node, key, value string, indices []int) *v1.Node { return node } +func setupNewNoExecuteTaintManager(ctx context.Context, fakeClientSet *fake.Clientset) (*NoExecuteTaintManager, cache.Indexer, cache.Indexer) { + informerFactory := informers.NewSharedInformerFactory(fakeClientSet, 0) + podIndexer := informerFactory.Core().V1().Pods().Informer().GetIndexer() + nodeIndexer := informerFactory.Core().V1().Nodes().Informer().GetIndexer() + mgr := NewNoExecuteTaintManager(ctx, fakeClientSet, informerFactory.Core().V1().Pods().Lister(), informerFactory.Core().V1().Nodes().Lister(), getPodsAssignedToNode(fakeClientSet)) + return mgr, podIndexer, nodeIndexer +} + type timestampedPod struct { names []string timestamp time.Duration @@ -219,10 +182,12 @@ func TestCreatePod(t *testing.T) { for _, item := range testCases { ctx, cancel := context.WithCancel(context.Background()) fakeClientset := fake.NewSimpleClientset() - controller := NewNoExecuteTaintManager(ctx, fakeClientset, (&podHolder{pod: item.pod}).getPod, getNodeFromClientset(fakeClientset), getPodsAssignedToNode(fakeClientset)) + controller, podIndexer, _ := setupNewNoExecuteTaintManager(ctx, fakeClientset) controller.recorder = testutil.NewFakeRecorder() go controller.Run(ctx) controller.taintedNodes = item.taintedNodes + + podIndexer.Add(item.pod) controller.PodUpdated(nil, item.pod) // wait a bit time.Sleep(timeForControllerToProgress) @@ -243,7 +208,7 @@ func TestCreatePod(t *testing.T) { func TestDeletePod(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset() - controller := NewNoExecuteTaintManager(context.TODO(), fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset), getPodsAssignedToNode(fakeClientset)) + controller, _, _ := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) controller.recorder = testutil.NewFakeRecorder() go controller.Run(context.TODO()) controller.taintedNodes = map[string][]v1.Taint{ @@ -306,17 +271,17 @@ func TestUpdatePod(t *testing.T) { for _, item := range testCases { ctx, cancel := context.WithCancel(context.Background()) fakeClientset := fake.NewSimpleClientset() - holder := &podHolder{} - controller := NewNoExecuteTaintManager(ctx, fakeClientset, holder.getPod, getNodeFromClientset(fakeClientset), getPodsAssignedToNode(fakeClientset)) + controller, podIndexer, _ := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) controller.recorder = testutil.NewFakeRecorder() go controller.Run(ctx) controller.taintedNodes = item.taintedNodes - holder.setPod(item.prevPod) + podIndexer.Add(item.prevPod) controller.PodUpdated(nil, item.prevPod) + fakeClientset.ClearActions() time.Sleep(timeForControllerToProgress) - holder.setPod(item.newPod) + podIndexer.Update(item.newPod) controller.PodUpdated(item.prevPod, item.newPod) // wait a bit time.Sleep(timeForControllerToProgress) @@ -373,7 +338,8 @@ func TestCreateNode(t *testing.T) { for _, item := range testCases { ctx, cancel := context.WithCancel(context.Background()) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) - controller := NewNoExecuteTaintManager(ctx, fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{node: item.node}).getNode, getPodsAssignedToNode(fakeClientset)) + controller, _, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) + nodeIndexer.Add(item.node) controller.recorder = testutil.NewFakeRecorder() go controller.Run(ctx) controller.NodeUpdated(nil, item.node) @@ -396,7 +362,7 @@ func TestCreateNode(t *testing.T) { func TestDeleteNode(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) fakeClientset := fake.NewSimpleClientset() - controller := NewNoExecuteTaintManager(ctx, fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset), getPodsAssignedToNode(fakeClientset)) + controller, _, _ := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) controller.recorder = testutil.NewFakeRecorder() controller.taintedNodes = map[string][]v1.Taint{ "node1": {createNoExecuteTaint(1)}, @@ -494,7 +460,8 @@ func TestUpdateNode(t *testing.T) { for _, item := range testCases { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) - controller := NewNoExecuteTaintManager(context.TODO(), fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{node: item.newNode}).getNode, getPodsAssignedToNode(fakeClientset)) + controller, _, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) + nodeIndexer.Add(item.newNode) controller.recorder = testutil.NewFakeRecorder() go controller.Run(context.TODO()) controller.NodeUpdated(item.oldNode, item.newNode) @@ -539,13 +506,12 @@ func TestUpdateNodeWithMultipleTaints(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) fakeClientset := fake.NewSimpleClientset(pod) - holder := &nodeHolder{node: untaintedNode} - controller := NewNoExecuteTaintManager(context.TODO(), fakeClientset, getPodFromClientset(fakeClientset), (holder).getNode, getPodsAssignedToNode(fakeClientset)) + controller, _, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) controller.recorder = testutil.NewFakeRecorder() go controller.Run(context.TODO()) // no taint - holder.setNode(untaintedNode) + nodeIndexer.Add(untaintedNode) controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"}) // verify pod is not queued for deletion if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil { @@ -553,7 +519,7 @@ func TestUpdateNodeWithMultipleTaints(t *testing.T) { } // no taint -> infinitely tolerated taint - holder.setNode(singleTaintedNode) + nodeIndexer.Update(singleTaintedNode) controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"}) // verify pod is not queued for deletion if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil { @@ -561,7 +527,7 @@ func TestUpdateNodeWithMultipleTaints(t *testing.T) { } // infinitely tolerated taint -> temporarily tolerated taint - holder.setNode(doubleTaintedNode) + nodeIndexer.Update(doubleTaintedNode) controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"}) // verify pod is queued for deletion if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) == nil { @@ -569,7 +535,7 @@ func TestUpdateNodeWithMultipleTaints(t *testing.T) { } // temporarily tolerated taint -> infinitely tolerated taint - holder.setNode(singleTaintedNode) + nodeIndexer.Update(singleTaintedNode) controller.handleNodeUpdate(ctx, nodeUpdateItem{"node1"}) // verify pod is not queued for deletion if controller.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String()) != nil { @@ -628,7 +594,8 @@ func TestUpdateNodeWithMultiplePods(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) sort.Sort(item.expectedDeleteTimes) - controller := NewNoExecuteTaintManager(context.TODO(), fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{node: item.newNode}).getNode, getPodsAssignedToNode(fakeClientset)) + controller, _, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) + nodeIndexer.Add(item.newNode) controller.recorder = testutil.NewFakeRecorder() go controller.Run(context.TODO()) controller.NodeUpdated(item.oldNode, item.newNode) @@ -827,13 +794,13 @@ func TestEventualConsistency(t *testing.T) { for _, item := range testCases { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) - holder := &podHolder{} - controller := NewNoExecuteTaintManager(context.TODO(), fakeClientset, holder.getPod, (&nodeHolder{node: item.newNode}).getNode, getPodsAssignedToNode(fakeClientset)) + controller, podIndexer, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) + nodeIndexer.Add(item.newNode) controller.recorder = testutil.NewFakeRecorder() go controller.Run(context.TODO()) if item.prevPod != nil { - holder.setPod(item.prevPod) + podIndexer.Add(item.prevPod) controller.PodUpdated(nil, item.prevPod) } @@ -854,7 +821,7 @@ func TestEventualConsistency(t *testing.T) { fakeClientset.ClearActions() // And now the delayed update of 'pod2' comes to the TaintManager. We should delete it as well. - holder.setPod(item.newPod) + podIndexer.Update(item.newPod) controller.PodUpdated(item.prevPod, item.newPod) // wait a bit time.Sleep(timeForControllerToProgress)