From 98e9c78c132a6087b34e66229eb590cf2f2d5176 Mon Sep 17 00:00:00 2001 From: Maciej Borsz Date: Tue, 25 Jun 2019 16:11:47 +0200 Subject: [PATCH] Migrate TaintManager to use watch for listing pods instead of expensive listing pods call. --- .../node_lifecycle_controller.go | 34 ++++- pkg/controller/nodelifecycle/scheduler/BUILD | 4 +- .../nodelifecycle/scheduler/taint_manager.go | 56 +++---- .../scheduler/taint_manager_test.go | 140 +++++++++++++++++- 4 files changed, 190 insertions(+), 44 deletions(-) diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index 68647db16f6..b0ebcc6df90 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -126,7 +126,8 @@ const ( const ( // The amount of time the nodecontroller should sleep between retrying node health updates - retrySleepTime = 20 * time.Millisecond + retrySleepTime = 20 * time.Millisecond + nodeNameKeyIndex = "spec.nodeName" ) // labelReconcileInfo lists Node labels to reconcile, and how to reconcile them. @@ -364,11 +365,40 @@ func NewNodeLifecycleController( nc.podInformerSynced = podInformer.Informer().HasSynced if nc.runTaintManager { + podInformer.Informer().AddIndexers(cache.Indexers{ + nodeNameKeyIndex: func(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return []string{}, nil + } + if len(pod.Spec.NodeName) == 0 { + return []string{}, nil + } + return []string{pod.Spec.NodeName}, nil + }, + }) + + podIndexer := podInformer.Informer().GetIndexer() podLister := podInformer.Lister() podGetter := func(name, namespace string) (*v1.Pod, error) { return podLister.Pods(namespace).Get(name) } + podByNodeNameLister := func(nodeName string) ([]v1.Pod, error) { + objs, err := podIndexer.ByIndex(nodeNameKeyIndex, nodeName) + if err != nil { + return nil, err + } + pods := make([]v1.Pod, 0, len(objs)) + for _, obj := range objs { + pod, ok := obj.(*v1.Pod) + if !ok { + continue + } + pods = append(pods, *pod) + } + return pods, nil + } nodeLister := nodeInformer.Lister() nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) } - nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter) + nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, podByNodeNameLister) nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { nc.taintManager.NodeUpdated(nil, node) diff --git a/pkg/controller/nodelifecycle/scheduler/BUILD b/pkg/controller/nodelifecycle/scheduler/BUILD index ad28abb8e0a..ed3be9c03f7 100644 --- a/pkg/controller/nodelifecycle/scheduler/BUILD +++ b/pkg/controller/nodelifecycle/scheduler/BUILD @@ -15,8 +15,6 @@ go_library( "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", @@ -42,6 +40,8 @@ go_test( "//pkg/controller/testutil:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/testing:go_default_library", diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager.go b/pkg/controller/nodelifecycle/scheduler/taint_manager.go index 6d32bedfad8..38777a7ad07 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager.go @@ -26,8 +26,6 @@ import ( "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientset "k8s.io/client-go/kubernetes" @@ -75,13 +73,17 @@ type GetPodFunc func(name, namespace string) (*v1.Pod, error) // GetNodeFunc returns the node for the specified name, or a NotFound error if missing. type GetNodeFunc func(name string) (*v1.Node, error) +// GetPodsByNodeNameFunc returns the list of pods assigned to the specified node. +type GetPodsByNodeNameFunc func(nodeName string) ([]v1.Pod, error) + // NoExecuteTaintManager listens to Taint/Toleration changes and is responsible for removing Pods // from Nodes tainted with NoExecute Taints. type NoExecuteTaintManager struct { - client clientset.Interface - recorder record.EventRecorder - getPod GetPodFunc - getNode GetNodeFunc + client clientset.Interface + recorder record.EventRecorder + getPod GetPodFunc + getNode GetNodeFunc + getPodsAssignedToNode GetPodsByNodeNameFunc taintEvictionQueue *TimedWorkerQueue // keeps a map from nodeName to all noExecute taints on that Node @@ -125,25 +127,6 @@ func getNoExecuteTaints(taints []v1.Taint) []v1.Taint { return result } -func getPodsAssignedToNode(c clientset.Interface, nodeName string) ([]v1.Pod, error) { - selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}) - pods, err := c.CoreV1().Pods(v1.NamespaceAll).List(metav1.ListOptions{ - FieldSelector: selector.String(), - LabelSelector: labels.Everything().String(), - }) - for i := 0; i < retries && err != nil; i++ { - pods, err = c.CoreV1().Pods(v1.NamespaceAll).List(metav1.ListOptions{ - FieldSelector: selector.String(), - LabelSelector: labels.Everything().String(), - }) - time.Sleep(100 * time.Millisecond) - } - if err != nil { - return []v1.Pod{}, fmt.Errorf("failed to get Pods assigned to node %v", nodeName) - } - return pods.Items, nil -} - // getMinTolerationTime returns minimal toleration time from the given slice, or -1 if it's infinite. func getMinTolerationTime(tolerations []v1.Toleration) time.Duration { minTolerationTime := int64(-1) @@ -167,7 +150,7 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration { // NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to // communicate with the API server. -func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc) *NoExecuteTaintManager { +func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"}) eventBroadcaster.StartLogging(klog.Infof) @@ -179,11 +162,12 @@ func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode } tm := &NoExecuteTaintManager{ - client: c, - recorder: recorder, - getPod: getPod, - getNode: getNode, - taintedNodes: make(map[string][]v1.Taint), + client: c, + recorder: recorder, + getPod: getPod, + getNode: getNode, + getPodsAssignedToNode: getPodsAssignedToNode, + taintedNodes: make(map[string][]v1.Taint), nodeUpdateQueue: workqueue.NewNamed("noexec_taint_node"), podUpdateQueue: workqueue.NewNamed("noexec_taint_pod"), @@ -228,6 +212,10 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { if shutdown { break } + // The fact that pods are processed by the same worker as nodes is used to avoid races + // between node worker setting tc.taintedNodes and pod worker reading this to decide + // whether to delete pod. + // It's possible that even without this assumption this code is still correct. podUpdate := item.(podUpdateItem) hash := hash(podUpdate.nodeName, UpdateWorkerSize) select { @@ -450,7 +438,11 @@ func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) { tc.taintedNodes[node.Name] = taints } }() - pods, err := getPodsAssignedToNode(tc.client, node.Name) + + // This is critical that we update tc.taintedNodes before we call getPodsAssignedToNode: + // getPodsAssignedToNode can be delayed as long as all future updates to pods will call + // tc.PodUpdated which will use tc.taintedNodes to potentially delete delayed pods. + pods, err := tc.getPodsAssignedToNode(node.Name) if err != nil { klog.Errorf(err.Error()) return diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go b/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go index 7a8504f258a..d490fafea19 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go @@ -23,7 +23,9 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/controller/testutil" @@ -39,6 +41,20 @@ func getPodFromClientset(clientset *fake.Clientset) GetPodFunc { } } +func getPodsAssignedToNode(c *fake.Clientset) GetPodsByNodeNameFunc { + return func(nodeName string) ([]v1.Pod, error) { + selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}) + pods, err := c.CoreV1().Pods(v1.NamespaceAll).List(metav1.ListOptions{ + FieldSelector: selector.String(), + LabelSelector: labels.Everything().String(), + }) + if err != nil { + return []v1.Pod{}, fmt.Errorf("failed to get Pods assigned to node %v", nodeName) + } + return pods.Items, nil + } +} + func getNodeFromClientset(clientset *fake.Clientset) GetNodeFunc { return func(name string) (*v1.Node, error) { return clientset.CoreV1().Nodes().Get(name, metav1.GetOptions{}) @@ -187,7 +203,7 @@ func TestCreatePod(t *testing.T) { for _, item := range testCases { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset() - controller := NewNoExecuteTaintManager(fakeClientset, (&podHolder{pod: item.pod}).getPod, getNodeFromClientset(fakeClientset)) + controller := NewNoExecuteTaintManager(fakeClientset, (&podHolder{pod: item.pod}).getPod, getNodeFromClientset(fakeClientset), getPodsAssignedToNode(fakeClientset)) controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.taintedNodes = item.taintedNodes @@ -211,7 +227,7 @@ func TestCreatePod(t *testing.T) { func TestDeletePod(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset() - controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset)) + controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset), getPodsAssignedToNode(fakeClientset)) controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.taintedNodes = map[string][]v1.Taint{ @@ -275,7 +291,7 @@ func TestUpdatePod(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset() holder := &podHolder{} - controller := NewNoExecuteTaintManager(fakeClientset, holder.getPod, getNodeFromClientset(fakeClientset)) + controller := NewNoExecuteTaintManager(fakeClientset, holder.getPod, getNodeFromClientset(fakeClientset), getPodsAssignedToNode(fakeClientset)) controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.taintedNodes = item.taintedNodes @@ -341,7 +357,7 @@ func TestCreateNode(t *testing.T) { for _, item := range testCases { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) - controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.node}).getNode) + controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.node}).getNode, getPodsAssignedToNode(fakeClientset)) controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.NodeUpdated(nil, item.node) @@ -364,7 +380,7 @@ func TestCreateNode(t *testing.T) { func TestDeleteNode(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset() - controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset)) + controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset), getPodsAssignedToNode(fakeClientset)) controller.recorder = testutil.NewFakeRecorder() controller.taintedNodes = map[string][]v1.Taint{ "node1": {createNoExecuteTaint(1)}, @@ -462,7 +478,7 @@ func TestUpdateNode(t *testing.T) { for _, item := range testCases { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) - controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.newNode}).getNode) + controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.newNode}).getNode, getPodsAssignedToNode(fakeClientset)) controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.NodeUpdated(item.oldNode, item.newNode) @@ -528,7 +544,7 @@ func TestUpdateNodeWithMultiplePods(t *testing.T) { stopCh := make(chan struct{}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) sort.Sort(item.expectedDeleteTimes) - controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.newNode}).getNode) + controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.newNode}).getNode, getPodsAssignedToNode(fakeClientset)) controller.recorder = testutil.NewFakeRecorder() go controller.Run(stopCh) controller.NodeUpdated(item.oldNode, item.newNode) @@ -642,3 +658,111 @@ func TestGetMinTolerationTime(t *testing.T) { } } } + +// TestEventualConsistency verifies if getPodsAssignedToNode returns incomplete data +// (e.g. due to watch latency), it will reconcile the remaining pods eventually. +// This scenario is partially covered by TestUpdatePods, but given this is an important +// property of TaitManager, it's better to have explicit test for this. +func TestEventualConsistency(t *testing.T) { + testCases := []struct { + description string + pods []v1.Pod + prevPod *v1.Pod + newPod *v1.Pod + oldNode *v1.Node + newNode *v1.Node + expectDelete bool + }{ + { + description: "existing pod2 scheduled onto tainted Node", + pods: []v1.Pod{ + *testutil.NewPod("pod1", "node1"), + }, + prevPod: testutil.NewPod("pod2", ""), + newPod: testutil.NewPod("pod2", "node1"), + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectDelete: true, + }, + { + description: "existing pod2 with taint toleration scheduled onto tainted Node", + pods: []v1.Pod{ + *testutil.NewPod("pod1", "node1"), + }, + prevPod: addToleration(testutil.NewPod("pod2", ""), 1, 100), + newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100), + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectDelete: false, + }, + { + description: "new pod2 created on tainted Node", + pods: []v1.Pod{ + *testutil.NewPod("pod1", "node1"), + }, + prevPod: nil, + newPod: testutil.NewPod("pod2", "node1"), + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectDelete: true, + }, + { + description: "new pod2 with tait toleration created on tainted Node", + pods: []v1.Pod{ + *testutil.NewPod("pod1", "node1"), + }, + prevPod: nil, + newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100), + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectDelete: false, + }, + } + + for _, item := range testCases { + stopCh := make(chan struct{}) + fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) + holder := &podHolder{} + controller := NewNoExecuteTaintManager(fakeClientset, holder.getPod, (&nodeHolder{item.newNode}).getNode, getPodsAssignedToNode(fakeClientset)) + controller.recorder = testutil.NewFakeRecorder() + go controller.Run(stopCh) + + if item.prevPod != nil { + holder.setPod(item.prevPod) + controller.PodUpdated(nil, item.prevPod) + } + + // First we simulate NodeUpdate that should delete 'pod1'. It doesn't know about 'pod2' yet. + controller.NodeUpdated(item.oldNode, item.newNode) + // TODO(mborsz): Remove this sleep and other sleeps in this file. + time.Sleep(timeForControllerToProgress) + + podDeleted := false + for _, action := range fakeClientset.Actions() { + if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { + podDeleted = true + } + } + if !podDeleted { + t.Errorf("%v: Unexpected test result. Expected delete, got: %v", item.description, podDeleted) + } + fakeClientset.ClearActions() + + // And now the delayed update of 'pod2' comes to the TaintManager. We should delete it as well. + holder.setPod(item.newPod) + controller.PodUpdated(item.prevPod, item.newPod) + // wait a bit + time.Sleep(timeForControllerToProgress) + + podDeleted = false + for _, action := range fakeClientset.Actions() { + if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { + podDeleted = true + } + } + if podDeleted != item.expectDelete { + t.Errorf("%v: Unexpected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted) + } + close(stopCh) + } +}