Taint node in paralle.

Signed-off-by: Klaus Ma <klaus1982.cn@gmail.com>
This commit is contained in:
Klaus Ma 2018-08-31 15:26:19 +08:00
parent e0782b99f1
commit 85a19b109a
4 changed files with 87 additions and 21 deletions

View File

@ -36,6 +36,7 @@ go_library(
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library", "//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
], ],

View File

@ -24,6 +24,8 @@ package nodelifecycle
import ( import (
"context" "context"
"fmt" "fmt"
"hash/fnv"
"io"
"sync" "sync"
"time" "time"
@ -46,6 +48,7 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
v1node "k8s.io/kubernetes/pkg/api/v1/node" v1node "k8s.io/kubernetes/pkg/api/v1/node"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
@ -218,6 +221,9 @@ type Controller struct {
// if set to true, NodeController will taint Nodes based on its condition for 'NetworkUnavailable', // if set to true, NodeController will taint Nodes based on its condition for 'NetworkUnavailable',
// 'MemoryPressure', 'OutOfDisk' and 'DiskPressure'. // 'MemoryPressure', 'OutOfDisk' and 'DiskPressure'.
taintNodeByCondition bool taintNodeByCondition bool
nodeUpdateChannels []chan *v1.Node
nodeUpdateQueue workqueue.Interface
} }
// NewNodeLifecycleController returns a new taint controller. // NewNodeLifecycleController returns a new taint controller.
@ -276,6 +282,7 @@ func NewNodeLifecycleController(podInformer coreinformers.PodInformer,
runTaintManager: runTaintManager, runTaintManager: runTaintManager,
useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager, useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager,
taintNodeByCondition: taintNodeByCondition, taintNodeByCondition: taintNodeByCondition,
nodeUpdateQueue: workqueue.New(),
} }
if useTaintBasedEvictions { if useTaintBasedEvictions {
glog.Infof("Controller is using taint based evictions.") glog.Infof("Controller is using taint based evictions.")
@ -343,10 +350,12 @@ func NewNodeLifecycleController(podInformer coreinformers.PodInformer,
glog.Infof("Controller will taint node by condition.") glog.Infof("Controller will taint node by condition.")
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error { AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
return nc.doNoScheduleTaintingPass(node) nc.nodeUpdateQueue.Add(node)
return nil
}), }),
UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
return nc.doNoScheduleTaintingPass(newNode) nc.nodeUpdateQueue.Add(newNode)
return nil
}), }),
}) })
} }
@ -383,18 +392,52 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
} }
if nc.runTaintManager { if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop) go nc.taintManager.Run(stopCh)
}
if nc.taintNodeByCondition {
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
nc.nodeUpdateChannels = append(nc.nodeUpdateChannels, make(chan *v1.Node, scheduler.NodeUpdateChannelSize))
}
// Dispatcher
go func(stopCh <-chan struct{}) {
for {
obj, shutdown := nc.nodeUpdateQueue.Get()
if shutdown {
break
}
node := obj.(*v1.Node)
hash := hash(node.Name, scheduler.UpdateWorkerSize)
select {
case <-stopCh:
nc.nodeUpdateQueue.Done(node)
return
case nc.nodeUpdateChannels[hash] <- node:
}
nc.nodeUpdateQueue.Done(node)
}
}(stopCh)
// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
// Start workers to update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
go nc.doNoScheduleTaintingPassWorker(i, stopCh)
}
} }
if nc.useTaintBasedEvictions { if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, wait.NeverStop) go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
} else { } else {
// Managing eviction of nodes: // Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then // When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion. // queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, wait.NeverStop) go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
} }
// Incorporate the results of node status pushed from kubelet to master. // Incorporate the results of node status pushed from kubelet to master.
@ -402,7 +445,7 @@ func (nc *Controller) Run(stopCh <-chan struct{}) {
if err := nc.monitorNodeStatus(); err != nil { if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err) glog.Errorf("Error monitoring node status: %v", err)
} }
}, nc.nodeMonitorPeriod, wait.NeverStop) }, nc.nodeMonitorPeriod, stopCh)
<-stopCh <-stopCh
} }
@ -445,6 +488,19 @@ func (nc *Controller) doFixDeprecatedTaintKeyPass(node *v1.Node) error {
return nil return nil
} }
func (nc *Controller) doNoScheduleTaintingPassWorker(i int, stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return
case node := <-nc.nodeUpdateChannels[i]:
if err := nc.doNoScheduleTaintingPass(node); err != nil {
glog.Errorf("Failed to taint NoSchedule on node <%s>: %v", node.Name, err)
}
}
}
}
func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error { func (nc *Controller) doNoScheduleTaintingPass(node *v1.Node) error {
// Map node's condition to Taints. // Map node's condition to Taints.
var taints []v1.Taint var taints []v1.Taint
@ -1197,3 +1253,9 @@ func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition)
return notReadyNodes, stateNormal return notReadyNodes, stateNormal
} }
} }
func hash(val string, max int) int {
hasher := fnv.New32a()
io.WriteString(hasher, val)
return int(hasher.Sum32()) % max
}

View File

@ -40,9 +40,15 @@ import (
) )
const ( const (
nodeUpdateChannelSize = 10 // TODO (k82cn): Figure out a reasonable number of workers/channels and propagate
podUpdateChannelSize = 1 // the number of workers up making it a paramater of Run() function.
retries = 5
// NodeUpdateChannelSize defines the size of channel for node update events.
NodeUpdateChannelSize = 10
// UpdateWorkerSize defines the size of workers for node update or/and pod update.
UpdateWorkerSize = 8
podUpdateChannelSize = 1
retries = 5
) )
// Needed to make workqueue work // Needed to make workqueue work
@ -204,11 +210,8 @@ func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager {
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) { func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
glog.V(0).Infof("Starting NoExecuteTaintManager") glog.V(0).Infof("Starting NoExecuteTaintManager")
// TODO: Figure out a reasonable number of workers and propagate the for i := 0; i < UpdateWorkerSize; i++ {
// number of workers up making it a paramater of Run() function. tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan *nodeUpdateItem, NodeUpdateChannelSize))
workers := 8
for i := 0; i < workers; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan *nodeUpdateItem, nodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan *podUpdateItem, podUpdateChannelSize)) tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan *podUpdateItem, podUpdateChannelSize))
} }
@ -221,11 +224,11 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
break break
} }
nodeUpdate := item.(*nodeUpdateItem) nodeUpdate := item.(*nodeUpdateItem)
hash := hash(nodeUpdate.name(), workers) hash := hash(nodeUpdate.name(), UpdateWorkerSize)
select { select {
case <-stopCh: case <-stopCh:
tc.nodeUpdateQueue.Done(item) tc.nodeUpdateQueue.Done(item)
break return
case tc.nodeUpdateChannels[hash] <- nodeUpdate: case tc.nodeUpdateChannels[hash] <- nodeUpdate:
} }
tc.nodeUpdateQueue.Done(item) tc.nodeUpdateQueue.Done(item)
@ -239,11 +242,11 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
break break
} }
podUpdate := item.(*podUpdateItem) podUpdate := item.(*podUpdateItem)
hash := hash(podUpdate.nodeName(), workers) hash := hash(podUpdate.nodeName(), UpdateWorkerSize)
select { select {
case <-stopCh: case <-stopCh:
tc.podUpdateQueue.Done(item) tc.podUpdateQueue.Done(item)
break return
case tc.podUpdateChannels[hash] <- podUpdate: case tc.podUpdateChannels[hash] <- podUpdate:
} }
tc.podUpdateQueue.Done(item) tc.podUpdateQueue.Done(item)
@ -251,8 +254,8 @@ func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
}(stopCh) }(stopCh)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(workers) wg.Add(UpdateWorkerSize)
for i := 0; i < workers; i++ { for i := 0; i < UpdateWorkerSize; i++ {
go tc.worker(i, wg.Done, stopCh) go tc.worker(i, wg.Done, stopCh)
} }
wg.Wait() wg.Wait()

View File

@ -639,7 +639,7 @@ func TestTaintNodeByCondition(t *testing.T) {
t.Errorf("Failed to create node, err: %v", err) t.Errorf("Failed to create node, err: %v", err)
} }
if err := waitForNodeTaints(cs, node, test.expectedTaints); err != nil { if err := waitForNodeTaints(cs, node, test.expectedTaints); err != nil {
t.Errorf("Failed to taint node, err: %v", err) t.Errorf("Failed to taint node <%s>, err: %v", node.Name, err)
} }
var pods []*v1.Pod var pods []*v1.Pod