diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager.go b/pkg/controller/nodelifecycle/scheduler/taint_manager.go index 67f27f90e39..2388ac20f51 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager.go @@ -18,6 +18,8 @@ package scheduler import ( "fmt" + "hash/fnv" + "io" "sync" "time" @@ -58,6 +60,32 @@ type podUpdateItem struct { newTolerations []v1.Toleration } +func (n *nodeUpdateItem) name() string { + if n.newNode != nil { + return n.newNode.ObjectMeta.Name + } + if n.oldNode != nil { + return n.oldNode.ObjectMeta.Name + } + return "" +} + +func (p *podUpdateItem) nodeName() string { + if p.newPod != nil { + return p.newPod.Spec.NodeName + } + if p.oldPod != nil { + return p.oldPod.Spec.NodeName + } + return "" +} + +func hash(val string) int { + hasher := fnv.New32a() + io.WriteString(hasher, val) + return int(hasher.Sum32()) +} + // NoExecuteTaintManager listens to Taint/Toleration changes and is responsible for removing Pods // from Nodes tainted with NoExecute Taints. type NoExecuteTaintManager struct { @@ -69,8 +97,8 @@ type NoExecuteTaintManager struct { taintedNodesLock sync.Mutex taintedNodes map[string][]v1.Taint - nodeUpdateChannel chan *nodeUpdateItem - podUpdateChannel chan *podUpdateItem + nodeUpdateChannels []chan *nodeUpdateItem + podUpdateChannels []chan *podUpdateItem nodeUpdateQueue workqueue.Interface podUpdateQueue workqueue.Interface @@ -160,11 +188,9 @@ func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager { } tm := &NoExecuteTaintManager{ - client: c, - recorder: recorder, - taintedNodes: make(map[string][]v1.Taint), - nodeUpdateChannel: make(chan *nodeUpdateItem, nodeUpdateChannelSize), - podUpdateChannel: make(chan *podUpdateItem, podUpdateChannelSize), + client: c, + recorder: recorder, + taintedNodes: make(map[string][]v1.Taint), nodeUpdateQueue: workqueue.New(), podUpdateQueue: workqueue.New(), @@ -177,6 +203,15 @@ func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager { // Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed. func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { glog.V(0).Infof("Starting NoExecuteTaintManager") + + // TODO: Figure out a reasonable number of workers and propagate the + // number of workers up making it a paramater of Run() function. + workers := 8 + for i := 0; i < workers; i++ { + tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan *nodeUpdateItem, nodeUpdateChannelSize)) + tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan *podUpdateItem, podUpdateChannelSize)) + } + // Functions that are responsible for taking work items out of the workqueues and putting them // into channels. go func(stopCh <-chan struct{}) { @@ -186,10 +221,11 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { break } nodeUpdate := item.(*nodeUpdateItem) + hash := hash(nodeUpdate.name()) select { case <-stopCh: break - case tc.nodeUpdateChannel <- nodeUpdate: + case tc.nodeUpdateChannels[hash%workers] <- nodeUpdate: } } }(stopCh) @@ -201,14 +237,26 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { break } podUpdate := item.(*podUpdateItem) + hash := hash(podUpdate.nodeName()) select { case <-stopCh: break - case tc.podUpdateChannel <- podUpdate: + case tc.podUpdateChannels[hash%workers] <- podUpdate: } } }(stopCh) + wg := sync.WaitGroup{} + wg.Add(workers) + for i := 0; i < workers; i++ { + go tc.worker(i, wg.Done, stopCh) + } + wg.Wait() +} + +func (tc *NoExecuteTaintManager) worker(worker int, done func(), stopCh <-chan struct{}) { + defer done() + // When processing events we want to prioritize Node updates over Pod updates, // as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible - // we don't want user (or system) to wait until PodUpdate queue is drained before it can @@ -216,15 +264,15 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { for { select { case <-stopCh: - break - case nodeUpdate := <-tc.nodeUpdateChannel: + return + case nodeUpdate := <-tc.nodeUpdateChannels[worker]: tc.handleNodeUpdate(nodeUpdate) - case podUpdate := <-tc.podUpdateChannel: + case podUpdate := <-tc.podUpdateChannels[worker]: // If we found a Pod update we need to empty Node queue first. priority: for { select { - case nodeUpdate := <-tc.nodeUpdateChannel: + case nodeUpdate := <-tc.nodeUpdateChannels[worker]: tc.handleNodeUpdate(nodeUpdate) default: break priority