Add incoming pod metrics to scheduler queue.

This commit is contained in:
Cong Liu
2019-10-07 13:29:53 -04:00
parent 6b2b5f25fd
commit fc226e0670
7 changed files with 249 additions and 126 deletions

View File

@@ -31,6 +31,7 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/internal/queue"
)
func (sched *Scheduler) onPvAdd(obj interface{}) {
@@ -40,7 +41,7 @@ func (sched *Scheduler) onPvAdd(obj interface{}) {
// provisioning and binding process, will not trigger events to schedule pod
// again. So we need to move pods to active queue on PV add for this
// scenario.
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvAdd)
}
func (sched *Scheduler) onPvUpdate(old, new interface{}) {
@@ -48,15 +49,15 @@ func (sched *Scheduler) onPvUpdate(old, new interface{}) {
// bindings due to conflicts if PVs are updated by PV controller or other
// parties, then scheduler will add pod back to unschedulable queue. We
// need to move pods to active queue on PV update for this scenario.
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvUpdate)
}
func (sched *Scheduler) onPvcAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcAdd)
}
func (sched *Scheduler) onPvcUpdate(old, new interface{}) {
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.PvcUpdate)
}
func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
@@ -73,20 +74,20 @@ func (sched *Scheduler) onStorageClassAdd(obj interface{}) {
// We don't need to invalidate cached results because results will not be
// cached for pod that has unbound immediate PVCs.
if sc.VolumeBindingMode != nil && *sc.VolumeBindingMode == storagev1.VolumeBindingWaitForFirstConsumer {
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.StorageClassAdd)
}
}
func (sched *Scheduler) onServiceAdd(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceAdd)
}
func (sched *Scheduler) onServiceUpdate(oldObj interface{}, newObj interface{}) {
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceUpdate)
}
func (sched *Scheduler) onServiceDelete(obj interface{}) {
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.ServiceDelete)
}
func (sched *Scheduler) addNodeToCache(obj interface{}) {
@@ -100,7 +101,7 @@ func (sched *Scheduler) addNodeToCache(obj interface{}) {
klog.Errorf("scheduler cache AddNode failed: %v", err)
}
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.NodeAdd)
}
func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
@@ -124,8 +125,10 @@ func (sched *Scheduler) updateNodeInCache(oldObj, newObj interface{}) {
// to save processing cycles. We still trigger a move to active queue to cover the case
// that a pod being processed by the scheduler is determined unschedulable. We want this
// pod to be reevaluated when a change in the cluster happens.
if sched.SchedulingQueue.NumUnschedulablePods() == 0 || nodeSchedulingPropertiesChanged(newNode, oldNode) {
sched.SchedulingQueue.MoveAllToActiveQueue()
if sched.SchedulingQueue.NumUnschedulablePods() == 0 {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.Unknown)
} else if event := nodeSchedulingPropertiesChange(newNode, oldNode); event != "" {
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(event)
}
}
@@ -166,7 +169,7 @@ func (sched *Scheduler) onCSINodeAdd(obj interface{}) {
klog.Errorf("scheduler cache AddCSINode failed: %v", err)
}
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeAdd)
}
func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
@@ -186,7 +189,7 @@ func (sched *Scheduler) onCSINodeUpdate(oldObj, newObj interface{}) {
klog.Errorf("scheduler cache UpdateCSINode failed: %v", err)
}
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.CSINodeUpdate)
}
func (sched *Scheduler) onCSINodeDelete(obj interface{}) {
@@ -315,7 +318,7 @@ func (sched *Scheduler) deletePodFromCache(obj interface{}) {
klog.Errorf("scheduler cache RemovePod failed: %v", err)
}
sched.SchedulingQueue.MoveAllToActiveQueue()
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(queue.AssignedPodDelete)
}
// assignedPod selects pods that are assigned (scheduled and running).
@@ -488,24 +491,24 @@ func AddAllEventHandlers(
)
}
func nodeSchedulingPropertiesChanged(newNode *v1.Node, oldNode *v1.Node) bool {
func nodeSchedulingPropertiesChange(newNode *v1.Node, oldNode *v1.Node) string {
if nodeSpecUnschedulableChanged(newNode, oldNode) {
return true
return queue.NodeSpecUnschedulableChange
}
if nodeAllocatableChanged(newNode, oldNode) {
return true
return queue.NodeAllocatableChange
}
if nodeLabelsChanged(newNode, oldNode) {
return true
return queue.NodeLabelChange
}
if nodeTaintsChanged(newNode, oldNode) {
return true
return queue.NodeTaintChange
}
if nodeConditionsChanged(newNode, oldNode) {
return true
return queue.NodeConditionChange
}
return false
return ""
}
func nodeAllocatableChanged(newNode *v1.Node, oldNode *v1.Node) bool {