Merge pull request #65350 from liggitt/simplify-taint-manager-key

Simplify taint manager workqueue keys
This commit is contained in:
k8s-ci-robot 2018-10-17 18:39:03 -07:00 committed by GitHub
commit 6f4b768c94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 138 additions and 81 deletions

View File

@ -345,7 +345,11 @@ func NewNodeLifecycleController(
nc.podInformerSynced = podInformer.Informer().HasSynced nc.podInformerSynced = podInformer.Informer().HasSynced
if nc.runTaintManager { if nc.runTaintManager {
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient) podLister := podInformer.Lister()
podGetter := func(name, namespace string) (*v1.Pod, error) { return podLister.Pods(namespace).Get(name) }
nodeLister := nodeInformer.Lister()
nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter)
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
nc.taintManager.NodeUpdated(nil, node) nc.taintManager.NodeUpdated(nil, node)

View File

@ -13,10 +13,12 @@ go_library(
"//pkg/apis/core/helper:go_default_library", "//pkg/apis/core/helper:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",

View File

@ -24,10 +24,12 @@ import (
"time" "time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -51,39 +53,14 @@ const (
retries = 5 retries = 5
) )
// Needed to make workqueue work
type updateItemInterface interface{}
type nodeUpdateItem struct { type nodeUpdateItem struct {
oldNode *v1.Node nodeName string
newNode *v1.Node
newTaints []v1.Taint
} }
type podUpdateItem struct { type podUpdateItem struct {
oldPod *v1.Pod podName string
newPod *v1.Pod podNamespace string
newTolerations []v1.Toleration nodeName string
}
func (n *nodeUpdateItem) name() string {
if n.newNode != nil {
return n.newNode.ObjectMeta.Name
}
if n.oldNode != nil {
return n.oldNode.ObjectMeta.Name
}
return ""
}
func (p *podUpdateItem) nodeName() string {
if p.newPod != nil {
return p.newPod.Spec.NodeName
}
if p.oldPod != nil {
return p.oldPod.Spec.NodeName
}
return ""
} }
func hash(val string, max int) int { func hash(val string, max int) int {
@ -92,19 +69,27 @@ func hash(val string, max int) int {
return int(hasher.Sum32() % uint32(max)) return int(hasher.Sum32() % uint32(max))
} }
// GetPodFunc returns the pod for the specified name/namespace, or a NotFound error if missing.
type GetPodFunc func(name, namespace string) (*v1.Pod, error)
// GetNodeFunc returns the node for the specified name, or a NotFound error if missing.
type GetNodeFunc func(name string) (*v1.Node, error)
// NoExecuteTaintManager listens to Taint/Toleration changes and is responsible for removing Pods // NoExecuteTaintManager listens to Taint/Toleration changes and is responsible for removing Pods
// from Nodes tainted with NoExecute Taints. // from Nodes tainted with NoExecute Taints.
type NoExecuteTaintManager struct { type NoExecuteTaintManager struct {
client clientset.Interface client clientset.Interface
recorder record.EventRecorder recorder record.EventRecorder
getPod GetPodFunc
getNode GetNodeFunc
taintEvictionQueue *TimedWorkerQueue taintEvictionQueue *TimedWorkerQueue
// keeps a map from nodeName to all noExecute taints on that Node // keeps a map from nodeName to all noExecute taints on that Node
taintedNodesLock sync.Mutex taintedNodesLock sync.Mutex
taintedNodes map[string][]v1.Taint taintedNodes map[string][]v1.Taint
nodeUpdateChannels []chan *nodeUpdateItem nodeUpdateChannels []chan nodeUpdateItem
podUpdateChannels []chan *podUpdateItem podUpdateChannels []chan podUpdateItem
nodeUpdateQueue workqueue.Interface nodeUpdateQueue workqueue.Interface
podUpdateQueue workqueue.Interface podUpdateQueue workqueue.Interface
@ -182,7 +167,7 @@ func getMinTolerationTime(tolerations []v1.Toleration) time.Duration {
// NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to // NewNoExecuteTaintManager creates a new NoExecuteTaintManager that will use passed clientset to
// communicate with the API server. // communicate with the API server.
func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager { func NewNoExecuteTaintManager(c clientset.Interface, getPod GetPodFunc, getNode GetNodeFunc) *NoExecuteTaintManager {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "taint-controller"})
eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartLogging(glog.Infof)
@ -196,6 +181,8 @@ func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager {
tm := &NoExecuteTaintManager{ tm := &NoExecuteTaintManager{
client: c, client: c,
recorder: recorder, recorder: recorder,
getPod: getPod,
getNode: getNode,
taintedNodes: make(map[string][]v1.Taint), taintedNodes: make(map[string][]v1.Taint),
nodeUpdateQueue: workqueue.New(), nodeUpdateQueue: workqueue.New(),
@ -211,8 +198,8 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
glog.V(0).Infof("Starting NoExecuteTaintManager") glog.V(0).Infof("Starting NoExecuteTaintManager")
for i := 0; i < UpdateWorkerSize; i++ { for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan *nodeUpdateItem, NodeUpdateChannelSize)) tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan *podUpdateItem, podUpdateChannelSize)) tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
} }
// Functions that are responsible for taking work items out of the workqueues and putting them // Functions that are responsible for taking work items out of the workqueues and putting them
@ -223,15 +210,15 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
if shutdown { if shutdown {
break break
} }
nodeUpdate := item.(*nodeUpdateItem) nodeUpdate := item.(nodeUpdateItem)
hash := hash(nodeUpdate.name(), UpdateWorkerSize) hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
select { select {
case <-stopCh: case <-stopCh:
tc.nodeUpdateQueue.Done(item) tc.nodeUpdateQueue.Done(item)
return return
case tc.nodeUpdateChannels[hash] <- nodeUpdate: case tc.nodeUpdateChannels[hash] <- nodeUpdate:
// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
} }
tc.nodeUpdateQueue.Done(item)
} }
}(stopCh) }(stopCh)
@ -241,15 +228,15 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
if shutdown { if shutdown {
break break
} }
podUpdate := item.(*podUpdateItem) podUpdate := item.(podUpdateItem)
hash := hash(podUpdate.nodeName(), UpdateWorkerSize) hash := hash(podUpdate.nodeName, UpdateWorkerSize)
select { select {
case <-stopCh: case <-stopCh:
tc.podUpdateQueue.Done(item) tc.podUpdateQueue.Done(item)
return return
case tc.podUpdateChannels[hash] <- podUpdate: case tc.podUpdateChannels[hash] <- podUpdate:
// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
} }
tc.podUpdateQueue.Done(item)
} }
}(stopCh) }(stopCh)
@ -274,6 +261,7 @@ func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan s
return return
case nodeUpdate := <-tc.nodeUpdateChannels[worker]: case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate) tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
case podUpdate := <-tc.podUpdateChannels[worker]: case podUpdate := <-tc.podUpdateChannels[worker]:
// If we found a Pod update we need to empty Node queue first. // If we found a Pod update we need to empty Node queue first.
priority: priority:
@ -281,63 +269,73 @@ func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan s
select { select {
case nodeUpdate := <-tc.nodeUpdateChannels[worker]: case nodeUpdate := <-tc.nodeUpdateChannels[worker]:
tc.handleNodeUpdate(nodeUpdate) tc.handleNodeUpdate(nodeUpdate)
tc.nodeUpdateQueue.Done(nodeUpdate)
default: default:
break priority break priority
} }
} }
// After Node queue is emptied we process podUpdate. // After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(podUpdate) tc.handlePodUpdate(podUpdate)
tc.podUpdateQueue.Done(podUpdate)
} }
} }
} }
// PodUpdated is used to notify NoExecuteTaintManager about Pod changes. // PodUpdated is used to notify NoExecuteTaintManager about Pod changes.
func (tc *NoExecuteTaintManager) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) { func (tc *NoExecuteTaintManager) PodUpdated(oldPod *v1.Pod, newPod *v1.Pod) {
podName := ""
podNamespace := ""
nodeName := ""
oldTolerations := []v1.Toleration{} oldTolerations := []v1.Toleration{}
if oldPod != nil { if oldPod != nil {
podName = oldPod.Name
podNamespace = oldPod.Namespace
nodeName = oldPod.Spec.NodeName
oldTolerations = oldPod.Spec.Tolerations oldTolerations = oldPod.Spec.Tolerations
} }
newTolerations := []v1.Toleration{} newTolerations := []v1.Toleration{}
if newPod != nil { if newPod != nil {
podName = newPod.Name
podNamespace = newPod.Namespace
nodeName = newPod.Spec.NodeName
newTolerations = newPod.Spec.Tolerations newTolerations = newPod.Spec.Tolerations
} }
if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName { if oldPod != nil && newPod != nil && helper.Semantic.DeepEqual(oldTolerations, newTolerations) && oldPod.Spec.NodeName == newPod.Spec.NodeName {
return return
} }
updateItem := &podUpdateItem{ updateItem := podUpdateItem{
oldPod: oldPod, podName: podName,
newPod: newPod, podNamespace: podNamespace,
newTolerations: newTolerations, nodeName: nodeName,
} }
tc.podUpdateQueue.Add(updateItemInterface(updateItem)) tc.podUpdateQueue.Add(updateItem)
} }
// NodeUpdated is used to notify NoExecuteTaintManager about Node changes. // NodeUpdated is used to notify NoExecuteTaintManager about Node changes.
func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) { func (tc *NoExecuteTaintManager) NodeUpdated(oldNode *v1.Node, newNode *v1.Node) {
nodeName := ""
oldTaints := []v1.Taint{} oldTaints := []v1.Taint{}
if oldNode != nil { if oldNode != nil {
oldTaints = oldNode.Spec.Taints nodeName = oldNode.Name
oldTaints = getNoExecuteTaints(oldNode.Spec.Taints)
} }
oldTaints = getNoExecuteTaints(oldTaints)
newTaints := []v1.Taint{} newTaints := []v1.Taint{}
if newNode != nil { if newNode != nil {
newTaints = newNode.Spec.Taints nodeName = newNode.Name
newTaints = getNoExecuteTaints(newNode.Spec.Taints)
} }
newTaints = getNoExecuteTaints(newTaints)
if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) { if oldNode != nil && newNode != nil && helper.Semantic.DeepEqual(oldTaints, newTaints) {
return return
} }
updateItem := &nodeUpdateItem{ updateItem := nodeUpdateItem{
oldNode: oldNode, nodeName: nodeName,
newNode: newNode,
newTaints: newTaints,
} }
tc.nodeUpdateQueue.Add(updateItemInterface(updateItem)) tc.nodeUpdateQueue.Add(updateItem)
} }
func (tc *NoExecuteTaintManager) cancelWorkWithEvent(nsName types.NamespacedName) { func (tc *NoExecuteTaintManager) cancelWorkWithEvent(nsName types.NamespacedName) {
@ -384,17 +382,26 @@ func (tc *NoExecuteTaintManager) processPodOnNode(
tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime) tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)
} }
func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) { func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
// Delete pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
if podUpdate.newPod == nil { if err != nil {
pod := podUpdate.oldPod if apierrors.IsNotFound(err) {
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} // Delete
glog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName) podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
tc.cancelWorkWithEvent(podNamespacedName) glog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
tc.cancelWorkWithEvent(podNamespacedName)
return
}
utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
return return
} }
// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
if pod.Spec.NodeName != podUpdate.nodeName {
return
}
// Create or Update // Create or Update
pod := podUpdate.newPod
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
glog.V(4).Infof("Noticed pod update: %#v", podNamespacedName) glog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
nodeName := pod.Spec.NodeName nodeName := pod.Spec.NodeName
@ -412,23 +419,27 @@ func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) {
if !ok { if !ok {
return return
} }
tc.processPodOnNode(podNamespacedName, nodeName, podUpdate.newTolerations, taints, time.Now()) tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
} }
func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) { func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
// Delete node, err := tc.getNode(nodeUpdate.nodeName)
if nodeUpdate.newNode == nil { if err != nil {
node := nodeUpdate.oldNode if apierrors.IsNotFound(err) {
glog.V(4).Infof("Noticed node deletion: %#v", node.Name) // Delete
tc.taintedNodesLock.Lock() glog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName)
defer tc.taintedNodesLock.Unlock() tc.taintedNodesLock.Lock()
delete(tc.taintedNodes, node.Name) defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, nodeUpdate.nodeName)
return
}
utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
return return
} }
// Create or Update // Create or Update
glog.V(4).Infof("Noticed node update: %#v", nodeUpdate) glog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
node := nodeUpdate.newNode taints := getNoExecuteTaints(node.Spec.Taints)
taints := nodeUpdate.newTaints
func() { func() {
tc.taintedNodesLock.Lock() tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock() defer tc.taintedNodesLock.Unlock()

View File

@ -19,6 +19,7 @@ package scheduler
import ( import (
"fmt" "fmt"
"sort" "sort"
"sync"
"testing" "testing"
"time" "time"
@ -32,6 +33,42 @@ import (
var timeForControllerToProgress = 500 * time.Millisecond var timeForControllerToProgress = 500 * time.Millisecond
func getPodFromClientset(clientset *fake.Clientset) GetPodFunc {
return func(name, namespace string) (*v1.Pod, error) {
return clientset.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
}
}
func getNodeFromClientset(clientset *fake.Clientset) GetNodeFunc {
return func(name string) (*v1.Node, error) {
return clientset.CoreV1().Nodes().Get(name, metav1.GetOptions{})
}
}
type podHolder struct {
pod *v1.Pod
sync.Mutex
}
func (p *podHolder) getPod(name, namespace string) (*v1.Pod, error) {
p.Lock()
defer p.Unlock()
return p.pod, nil
}
func (p *podHolder) setPod(pod *v1.Pod) {
p.Lock()
defer p.Unlock()
p.pod = pod
}
type nodeHolder struct {
node *v1.Node
}
func (n *nodeHolder) getNode(name string) (*v1.Node, error) {
return n.node, nil
}
func createNoExecuteTaint(index int) v1.Taint { func createNoExecuteTaint(index int) v1.Taint {
now := metav1.Now() now := metav1.Now()
return v1.Taint{ return v1.Taint{
@ -150,7 +187,7 @@ func TestCreatePod(t *testing.T) {
for _, item := range testCases { for _, item := range testCases {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset() fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset) controller := NewNoExecuteTaintManager(fakeClientset, (&podHolder{pod: item.pod}).getPod, getNodeFromClientset(fakeClientset))
controller.recorder = testutil.NewFakeRecorder() controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh) go controller.Run(stopCh)
controller.taintedNodes = item.taintedNodes controller.taintedNodes = item.taintedNodes
@ -174,7 +211,7 @@ func TestCreatePod(t *testing.T) {
func TestDeletePod(t *testing.T) { func TestDeletePod(t *testing.T) {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset() fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset) controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset))
controller.recorder = testutil.NewFakeRecorder() controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh) go controller.Run(stopCh)
controller.taintedNodes = map[string][]v1.Taint{ controller.taintedNodes = map[string][]v1.Taint{
@ -237,14 +274,17 @@ func TestUpdatePod(t *testing.T) {
for _, item := range testCases { for _, item := range testCases {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset() fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset) holder := &podHolder{}
controller := NewNoExecuteTaintManager(fakeClientset, holder.getPod, getNodeFromClientset(fakeClientset))
controller.recorder = testutil.NewFakeRecorder() controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh) go controller.Run(stopCh)
controller.taintedNodes = item.taintedNodes controller.taintedNodes = item.taintedNodes
holder.setPod(item.prevPod)
controller.PodUpdated(nil, item.prevPod) controller.PodUpdated(nil, item.prevPod)
fakeClientset.ClearActions() fakeClientset.ClearActions()
time.Sleep(timeForControllerToProgress) time.Sleep(timeForControllerToProgress)
holder.setPod(item.newPod)
controller.PodUpdated(item.prevPod, item.newPod) controller.PodUpdated(item.prevPod, item.newPod)
// wait a bit // wait a bit
time.Sleep(timeForControllerToProgress) time.Sleep(timeForControllerToProgress)
@ -301,7 +341,7 @@ func TestCreateNode(t *testing.T) {
for _, item := range testCases { for _, item := range testCases {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller := NewNoExecuteTaintManager(fakeClientset) controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.node}).getNode)
controller.recorder = testutil.NewFakeRecorder() controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh) go controller.Run(stopCh)
controller.NodeUpdated(nil, item.node) controller.NodeUpdated(nil, item.node)
@ -324,7 +364,7 @@ func TestCreateNode(t *testing.T) {
func TestDeleteNode(t *testing.T) { func TestDeleteNode(t *testing.T) {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset() fakeClientset := fake.NewSimpleClientset()
controller := NewNoExecuteTaintManager(fakeClientset) controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), getNodeFromClientset(fakeClientset))
controller.recorder = testutil.NewFakeRecorder() controller.recorder = testutil.NewFakeRecorder()
controller.taintedNodes = map[string][]v1.Taint{ controller.taintedNodes = map[string][]v1.Taint{
"node1": {createNoExecuteTaint(1)}, "node1": {createNoExecuteTaint(1)},
@ -422,7 +462,7 @@ func TestUpdateNode(t *testing.T) {
for _, item := range testCases { for _, item := range testCases {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
controller := NewNoExecuteTaintManager(fakeClientset) controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.newNode}).getNode)
controller.recorder = testutil.NewFakeRecorder() controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh) go controller.Run(stopCh)
controller.NodeUpdated(item.oldNode, item.newNode) controller.NodeUpdated(item.oldNode, item.newNode)
@ -488,7 +528,7 @@ func TestUpdateNodeWithMultiplePods(t *testing.T) {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods})
sort.Sort(item.expectedDeleteTimes) sort.Sort(item.expectedDeleteTimes)
controller := NewNoExecuteTaintManager(fakeClientset) controller := NewNoExecuteTaintManager(fakeClientset, getPodFromClientset(fakeClientset), (&nodeHolder{item.newNode}).getNode)
controller.recorder = testutil.NewFakeRecorder() controller.recorder = testutil.NewFakeRecorder()
go controller.Run(stopCh) go controller.Run(stopCh)
controller.NodeUpdated(item.oldNode, item.newNode) controller.NodeUpdated(item.oldNode, item.newNode)