Split NodeController rate limiters between zones

This commit is contained in:
gmarek
2016-07-12 14:29:46 +02:00
parent eecbfb1a28
commit 5677a9845e
5 changed files with 184 additions and 81 deletions

View File

@@ -73,13 +73,12 @@ type nodeStatusData struct {
}
type NodeController struct {
allocateNodeCIDRs bool
cloud cloudprovider.Interface
clusterCIDR *net.IPNet
serviceCIDR *net.IPNet
deletingPodsRateLimiter flowcontrol.RateLimiter
knownNodeSet map[string]*api.Node
kubeClient clientset.Interface
allocateNodeCIDRs bool
cloud cloudprovider.Interface
clusterCIDR *net.IPNet
serviceCIDR *net.IPNet
knownNodeSet map[string]*api.Node
kubeClient clientset.Interface
// Method for easy mocking in unittest.
lookupIP func(host string) ([]net.IP, error)
// Value used if sync_nodes_status=False. NodeController will not proactively
@@ -112,9 +111,11 @@ type NodeController struct {
// Lock to access evictor workers
evictorLock *sync.Mutex
// workers that evicts pods from unresponsive nodes.
podEvictor *RateLimitedTimedQueue
terminationEvictor *RateLimitedTimedQueue
podEvictionTimeout time.Duration
zonePodEvictor map[string]*RateLimitedTimedQueue
zoneTerminationEvictor map[string]*RateLimitedTimedQueue
evictionLimiterQPS float32
evictionLimiterBurst int
podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
recorder record.EventRecorder
@@ -142,8 +143,8 @@ func NewNodeController(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
deletionEvictionLimiter flowcontrol.RateLimiter,
terminationEvictionLimiter flowcontrol.RateLimiter,
evictionLimiterQPS float32,
evictionLimiterBurst int,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
@@ -184,8 +185,8 @@ func NewNodeController(
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
evictorLock: &evictorLock,
podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter),
terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter),
zonePodEvictor: make(map[string]*RateLimitedTimedQueue),
zoneTerminationEvictor: make(map[string]*RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
@@ -198,6 +199,8 @@ func NewNodeController(
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
computeZoneStateFunc: ComputeZoneState,
evictionLimiterQPS: evictionLimiterQPS,
evictionLimiterBurst: evictionLimiterBurst,
zoneStates: make(map[string]zoneState),
}
@@ -309,18 +312,20 @@ func (nc *NodeController) Run(period time.Duration) {
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
for k := range nc.zonePodEvictor {
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
nc.terminationEvictor.Add(value.Value)
}
return true, 0
})
if remaining {
nc.zoneTerminationEvictor[k].Add(value.Value)
}
return true, 0
})
}
}, nodeEvictionPeriod, wait.NeverStop)
// TODO: replace with a controller that ensures pods that are terminating complete
@@ -328,26 +333,28 @@ func (nc *NodeController) Run(period time.Duration) {
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
return false, 0
}
for k := range nc.zoneTerminationEvictor {
nc.zoneTerminationEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
return false, 0
}
if completed {
glog.V(2).Infof("All pods terminated on %s", value.Value)
recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
return true, 0
}
if completed {
glog.V(2).Infof("All pods terminated on %s", value.Value)
recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
return true, 0
}
glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining)
// clamp very short intervals
if remaining < nodeEvictionPeriod {
remaining = nodeEvictionPeriod
}
return false, remaining
})
glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining)
// clamp very short intervals
if remaining < nodeEvictionPeriod {
remaining = nodeEvictionPeriod
}
return false, remaining
})
}
}, nodeEvictionPeriod, wait.NeverStop)
go wait.Until(func() {
@@ -372,8 +379,19 @@ func (nc *NodeController) monitorNodeStatus() error {
for i := range added {
glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
recordNodeEvent(nc.recorder, added[i].Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
nc.cancelPodEviction(added[i])
nc.knownNodeSet[added[i].Name] = added[i]
// When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
zone := utilnode.GetZoneKey(added[i])
if _, found := nc.zonePodEvictor[zone]; !found {
nc.zonePodEvictor[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst))
}
if _, found := nc.zoneTerminationEvictor[zone]; !found {
nc.zoneTerminationEvictor[zone] = NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst))
}
nc.cancelPodEviction(added[i])
}
for i := range deleted {
@@ -689,10 +707,11 @@ func (nc *NodeController) checkForNodeAddedDeleted(nodes *api.NodeList) (added,
// cancelPodEviction removes any queued evictions, typically because the node is available again. It
// returns true if an eviction was queued.
func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
zone := utilnode.GetZoneKey(node)
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
wasDeleting := nc.podEvictor.Remove(node.Name)
wasTerminating := nc.terminationEvictor.Remove(node.Name)
wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name)
wasTerminating := nc.zoneTerminationEvictor[zone].Remove(node.Name)
if wasDeleting || wasTerminating {
glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name)
return true
@@ -703,10 +722,18 @@ func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
// evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction.
func (nc *NodeController) evictPods(node *api.Node) bool {
if nc.zoneStates[utilnode.GetZoneKey(node)] == stateFullSegmentation {
return false
}
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
return nc.podEvictor.Add(node.Name)
foundHealty := false
for _, state := range nc.zoneStates {
if state != stateFullSegmentation {
foundHealty = true
break
}
}
if !foundHealty {
return false
}
zone := utilnode.GetZoneKey(node)
return nc.zonePodEvictor[zone].Add(node.Name)
}