Merge pull request #79370 from mborsz/node

Use watch in TaintManager
This commit is contained in:
Kubernetes Prow Robot 2019-06-26 06:49:17 -07:00 committed by GitHub
commit eed7af6e41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 190 additions and 44 deletions

View File

@ -126,7 +126,8 @@ const (
const (
// The amount of time the nodecontroller should sleep between retrying node health updates
retrySleepTime = 20 * time.Millisecond
retrySleepTime = 20 * time.Millisecond
nodeNameKeyIndex = "spec.nodeName"
)
// labelReconcileInfo lists Node labels to reconcile, and how to reconcile them.
@ -364,11 +365,40 @@ func NewNodeLifecycleController(
nc.podInformerSynced = podInformer.Informer().HasSynced
if nc.runTaintManager {
podInformer.Informer().AddIndexers(cache.Indexers{
nodeNameKeyIndex: func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return []string{}, nil
}
if len(pod.Spec.NodeName) == 0 {
return []string{}, nil
}
return []string{pod.Spec.NodeName}, nil
},
})
podIndexer := podInformer.Informer().GetIndexer()
podLister := podInformer.Lister()
podGetter := func(name, namespace string) (*v1.Pod, error) { return podLister.Pods(namespace).Get(name) }
podByNodeNameLister := func(nodeName string) ([]v1.Pod, error) {
objs, err := podIndexer.ByIndex(nodeNameKeyIndex, nodeName)
if err != nil {
return nil, err
}
pods := make([]v1.Pod, 0, len(objs))
for _, obj := range objs {
pod, ok := obj.(*v1.Pod)
if !ok {
continue
}
pods = append(pods, *pod)
}
return pods, nil
}
nodeLister := nodeInformer.Lister()
nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter)
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, podByNodeNameLister)
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(nil, node)

View File

@ -15,8 +15,6 @@ go_library(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
@ -42,6 +40,8 @@ go_test(
"//pkg/controller/testutil:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",

View File

@ -26,8 +26,6 @@ import (
"k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientset "k8s.io/client-go/kubernetes"
@ -75,13 +73,17 @@ type GetPodFunc func(name, namespace string) (*v1.Pod, error)
// GetNodeFunc returns the node for the specified name, or a NotFound error if missing.
type GetNodeFunc func(name string) (*v1.Node, error)
// GetPodsByNodeNameFunc returns the list of pods assigned to the specified node.
type GetPodsByNodeNameFunc func(nodeName string) ([]v1.Pod, error)
// NoExecuteTaintManager listens to Taint/Toleration changes and is responsible for removing Pods
// from Nodes tainted with NoExecute Taints.
type NoExecuteTaintManager struct {
client clientset.Interface
recorder record.EventRecorder
getPod GetPodFunc
getNode GetNodeFunc
client clientset.Interface
recorder record.EventRecorder
getPod GetPodFunc
getNode GetNodeFunc
getPodsAssignedToNode GetPodsByNodeNameFunc
taintEvictionQueue *TimedWorkerQueue
// keeps a map from nodeName to all noExecute taints on that Node
@ -125,25 +127,6 @@ func getNoExecuteTaints(taints []v1.Taint) []v1.Taint {
return result
}
func getPodsAssignedToNode(c clientset.Interface, nodeName string) ([]v1.Pod, error) {
selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName})
pods, err := c.CoreV1().Pods(v1.NamespaceAll).List(metav1.ListOptions{
FieldSelector: selector.String(),
LabelSelector: labels.Everything().String(),
})
for i := 0; i < retries && err != nil; i++ {
pods, err = c.CoreV1().Pods(v1.NamespaceAll).List(metav1.ListOptions{
FieldSelector: selector.String(),
LabelSelector: labels.Everything().String(),
})
time.Sleep(100 * time.Millisecond)
}
if err != nil {
return []v1.Pod{}, fmt.Errorf("failed to get Pods assigned to node %v", nodeName)
}
return pods.Items, nil
}
// getMinTolerationTime returns minimal toleration time from the given slice, or -1 if it's infinite.
func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
minTolerationTime := int64(-1)
@ -167,7 +150,7 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
// NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to
// communicate with the API server.
func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc) *NoExecuteTaintManager {
func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc, getPodsAssignedToNode GetPodsByNodeNameFunc) *NoExecuteTaintManager {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
eventBroadcaster.StartLogging(klog.Infof)
@ -179,11 +162,12 @@ func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode
}
tm := &NoExecuteTaintManager{
client: c,
recorder: recorder,
getPod: getPod,
getNode: getNode,
taintedNodes: make(map[string][]v1.Taint),
client: c,
recorder: recorder,
getPod: getPod,
getNode: getNode,
getPodsAssignedToNode: getPodsAssignedToNode,
taintedNodes: make(map[string][]v1.Taint),
nodeUpdateQueue: workqueue.NewNamed("noexec_taint_node"),
podUpdateQueue: workqueue.NewNamed("noexec_taint_pod"),
@ -228,6 +212,10 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
if shutdown {
break
}
// The fact that pods are processed by the same worker as nodes is used to avoid races
// between node worker setting tc.taintedNodes and pod worker reading this to decide
// whether to delete pod.
// It's possible that even without this assumption this code is still correct.
podUpdate := item.(podUpdateItem)
hash := hash(podUpdate.nodeName, UpdateWorkerSize)
select {
@ -450,7 +438,11 @@ func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
tc.taintedNodes[node.Name] = taints
}
}()
pods, err := getPodsAssignedToNode(tc.client, node.Name)
// This is critical that we update tc.taintedNodes before we call getPodsAssignedToNode:
// getPodsAssignedToNode can be delayed as long as all future updates to pods will call
// tc.PodUpdated which will use tc.taintedNodes to potentially delete delayed pods.
pods, err := tc.getPodsAssignedToNode(node.Name)
if err != nil {
klog.Errorf(err.Error())
return

View File

@ -23,7 +23,9 @@ import (
"testing"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/controller/testutil"
@ -39,6 +41,20 @@ func getPodFromClientset(clientset *fake.Clientset) GetPodFunc {
}
}
func getPodsAssignedToNode(c *fake.Clientset) GetPodsByNodeNameFunc {
return func(nodeName string) ([]v1.Pod, error) {
selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName})
pods, err := c.CoreV1().Pods(v1.NamespaceAll).List(metav1.ListOptions{
FieldSelector: selector.String(),
LabelSelector: labels.Everything().String(),
})
if err != nil {
return []v1.Pod{}, fmt.Errorf("failed to get Pods assigned to node %v", nodeName)
}
return pods.Items, nil
}
}
func getNodeFromClientset(clientset *fake.Clientset) GetNodeFunc {
return func(name string) (*v1.Node, error) {
return clientset.CoreV1().Nodes().Get(name, metav1.GetOptions{})
@ -187,7 +203,7 @@ func TestCreatePod(t *testing.T) {
for _, item := range testCases {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset, (&podHolder{pod: item.pod}).getPod, getNodeFromClientset(fakeClientset))
controller := NewNoExecuteTaintManager(fakeClientset, (&podHolder{pod: item.pod}).getPod, getNodeFromClientset(fakeClientset), getPodsAssignedToNode(fakeClientset))
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh)
controller.taintedNodes = item.taintedNodes
@ -211,7 +227,7 @@ func TestCreatePod(t *testing.T) {
func TestDeletePod(t *testing.T) {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset))
controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset), getPodsAssignedToNode(fakeClientset))
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh)
controller.taintedNodes = map[string][]v1.Taint{
@ -275,7 +291,7 @@ func TestUpdatePod(t *testing.T) {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset()
holder := &podHolder{}
controller := NewNoExecuteTaintManager(fakeClientset, holder.getPod, getNodeFromClientset(fakeClientset))
controller := NewNoExecuteTaintManager(fakeClientset, holder.getPod, getNodeFromClientset(fakeClientset), getPodsAssignedToNode(fakeClientset))
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh)
controller.taintedNodes = item.taintedNodes
@ -341,7 +357,7 @@ func TestCreateNode(t *testing.T) {
for _, item := range testCases {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.node}).getNode)
controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.node}).getNode, getPodsAssignedToNode(fakeClientset))
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh)
controller.NodeUpdated(nil, item.node)
@ -364,7 +380,7 @@ func TestCreateNode(t *testing.T) {
func TestDeleteNode(t *testing.T) {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset))
controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset), getPodsAssignedToNode(fakeClientset))
controller.recorder = testutil.NewFakeRecorder()
controller.taintedNodes = map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)},
@ -462,7 +478,7 @@ func TestUpdateNode(t *testing.T) {
for _, item := range testCases {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.newNode}).getNode)
controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.newNode}).getNode, getPodsAssignedToNode(fakeClientset))
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh)
controller.NodeUpdated(item.oldNode, item.newNode)
@ -528,7 +544,7 @@ func TestUpdateNodeWithMultiplePods(t *testing.T) {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
sort.Sort(item.expectedDeleteTimes)
controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.newNode}).getNode)
controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.newNode}).getNode, getPodsAssignedToNode(fakeClientset))
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh)
controller.NodeUpdated(item.oldNode, item.newNode)
@ -642,3 +658,111 @@ func TestGetMinTolerationTime(t *testing.T) {
}
}
}
// TestEventualConsistency verifies if getPodsAssignedToNode returns incomplete data
// (e.g. due to watch latency), it will reconcile the remaining pods eventually.
// This scenario is partially covered by TestUpdatePods, but given this is an important
// property of TaitManager, it's better to have explicit test for this.
func TestEventualConsistency(t *testing.T) {
testCases := []struct {
description string
pods []v1.Pod
prevPod *v1.Pod
newPod *v1.Pod
oldNode *v1.Node
newNode *v1.Node
expectDelete bool
}{
{
description: "existing pod2 scheduled onto tainted Node",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: testutil.NewPod("pod2", ""),
newPod: testutil.NewPod("pod2", "node1"),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: true,
},
{
description: "existing pod2 with taint toleration scheduled onto tainted Node",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: addToleration(testutil.NewPod("pod2", ""), 1, 100),
newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: false,
},
{
description: "new pod2 created on tainted Node",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: nil,
newPod: testutil.NewPod("pod2", "node1"),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: true,
},
{
description: "new pod2 with tait toleration created on tainted Node",
pods: []v1.Pod{
*testutil.NewPod("pod1", "node1"),
},
prevPod: nil,
newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100),
oldNode: testutil.NewNode("node1"),
newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}),
expectDelete: false,
},
}
for _, item := range testCases {
stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
holder := &podHolder{}
controller := NewNoExecuteTaintManager(fakeClientset, holder.getPod, (&nodeHolder{item.newNode}).getNode, getPodsAssignedToNode(fakeClientset))
controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh)
if item.prevPod != nil {
holder.setPod(item.prevPod)
controller.PodUpdated(nil, item.prevPod)
}
// First we simulate NodeUpdate that should delete 'pod1'. It doesn't know about 'pod2' yet.
controller.NodeUpdated(item.oldNode, item.newNode)
// TODO(mborsz): Remove this sleep and other sleeps in this file.
time.Sleep(timeForControllerToProgress)
podDeleted := false
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
if !podDeleted {
t.Errorf("%v: Unexpected test result. Expected delete, got: %v", item.description, podDeleted)
}
fakeClientset.ClearActions()
// And now the delayed update of 'pod2' comes to the TaintManager. We should delete it as well.
holder.setPod(item.newPod)
controller.PodUpdated(item.prevPod, item.newPod)
// wait a bit
time.Sleep(timeForControllerToProgress)
podDeleted = false
for _, action := range fakeClientset.Actions() {
if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" {
podDeleted = true
}
}
if podDeleted != item.expectDelete {
t.Errorf("%v: Unexpected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted)
}
close(stopCh)
}
}